You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by mr...@apache.org on 2007/02/27 23:30:35 UTC

svn commit: r512456 [1/2] - in /incubator/ode/trunk: ./ axis2-war/ axis2/ axis2/src/main/java/org/apache/ode/axis2/ bpel-api/src/main/java/org/apache/ode/bpel/iapi/ bpel-dao/src/main/java/org/apache/ode/bpel/dao/ bpel-runtime/ bpel-runtime/src/main/jav...

Author: mriou
Date: Tue Feb 27 14:30:33 2007
New Revision: 512456

URL: http://svn.apache.org/viewvc?view=rev&rev=512456
Log:
Several modifications on JPA and some message exchange stuff:

1. More JPA queries and mapping implemented. At leat ODE starts and you can deploy processes. As for execution, we're not there yet.
2. Direct process to process invocation (without passing by any messaging layer) is now reliable. Messages can't get lost in the way.
3. Improved the MyRole mex to add more reliability as well. When a message arrives we save the message and its mex and process in a separate scheduled transaction. So if something wrong occurs, we're almost sure to be safe anyway (unless saving the message itself failed).

Added:
    incubator/ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/BPELDAOConnectionFactoryImpl.java
    incubator/ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrSetProperty.java
    incubator/ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MexProperty.java
    incubator/ode/trunk/dao-jpa/src/main/resources/
    incubator/ode/trunk/dao-jpa/src/main/resources/META-INF/
    incubator/ode/trunk/dao-jpa/src/main/resources/META-INF/persistence.xml
Removed:
    incubator/ode/trunk/dao-jpa-ojpa/src/main/java/org/apache/ode/dao/jpa/ojpa/BPELDAOConnectionFactoryImpl.java
    incubator/ode/trunk/dao-jpa-ojpa/src/main/resources/META-INF/persistence.xml
Modified:
    incubator/ode/trunk/axis2-war/pom.xml
    incubator/ode/trunk/axis2/pom.xml
    incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/MessageExchangeContextImpl.java
    incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
    incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEService.java
    incubator/ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MyRoleMessageExchange.java
    incubator/ode/trunk/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java
    incubator/ode/trunk/bpel-runtime/pom.xml
    incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/elang/xpath20/runtime/JaxpVariableResolver.java
    incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/elang/xpath20/runtime/XPath20ExpressionRuntime.java
    incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
    incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
    incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
    incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/InstanceLockManager.java
    incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
    incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
    incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java
    incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java
    incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/XmlDataDaoImpl.java
    incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/INVOKE.java
    incubator/ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java
    incubator/ode/trunk/bpel-test/src/test/java/org/apache/ode/test/BPELTest.java
    incubator/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java
    incubator/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HMessageExchange.java
    incubator/ode/trunk/dao-jpa-ojpa-derby/pom.xml
    incubator/ode/trunk/dao-jpa-ojpa-derby/src/main/descriptors/persistence.derby.xml
    incubator/ode/trunk/dao-jpa-ojpa-derby/src/test/java/org/apache/ode/dao/jpa/test/InsertObjectTest.java
    incubator/ode/trunk/dao-jpa-ojpa-derby/src/test/java/org/apache/ode/dao/jpa/test/SelectObjectTest.java
    incubator/ode/trunk/dao-jpa-ojpa-derby/src/test/resources/META-INF/persistence.xml
    incubator/ode/trunk/dao-jpa-ojpa/pom.xml
    incubator/ode/trunk/dao-jpa/pom.xml
    incubator/ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/BPELDAOConnectionImpl.java
    incubator/ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelationSetDAOImpl.java
    incubator/ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/EventDAOImpl.java
    incubator/ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java
    incubator/ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessDAOImpl.java
    incubator/ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessInstanceDAOImpl.java
    incubator/ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ScopeDAOImpl.java
    incubator/ode/trunk/distro-axis2/src/examples/DynPartner/deploy.xml
    incubator/ode/trunk/jbi/pom.xml
    incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeService.java
    incubator/ode/trunk/pom.xml

Modified: incubator/ode/trunk/axis2-war/pom.xml
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/axis2-war/pom.xml?view=diff&rev=512456&r1=512455&r2=512456
==============================================================================
--- incubator/ode/trunk/axis2-war/pom.xml (original)
+++ incubator/ode/trunk/axis2-war/pom.xml Tue Feb 27 14:30:33 2007
@@ -311,13 +311,7 @@
         <!-- JPA -->
         <dependency>
             <groupId>org.apache.ode</groupId>
-            <artifactId>ode-dao-jpa-ojpa</artifactId>
-            <exclusions>
-                <exclusion>
-                    <artifactId>ode-dao-jpa</artifactId>
-                    <groupId>org.apache.ode</groupId>
-                </exclusion>
-            </exclusions>
+            <artifactId>ode-dao-jpa</artifactId>
         </dependency>
         <dependency>
             <groupId>javax.persistence</groupId>

Modified: incubator/ode/trunk/axis2/pom.xml
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/axis2/pom.xml?view=diff&rev=512456&r1=512455&r2=512456
==============================================================================
--- incubator/ode/trunk/axis2/pom.xml (original)
+++ incubator/ode/trunk/axis2/pom.xml Tue Feb 27 14:30:33 2007
@@ -73,7 +73,7 @@
 
         <dependency>
             <groupId>org.apache.ode</groupId>
-            <artifactId>ode-dao-jpa-ojpa</artifactId>
+            <artifactId>ode-dao-jpa</artifactId>
             <scope>test</scope>
         </dependency>
         

Modified: incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/MessageExchangeContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/MessageExchangeContextImpl.java?view=diff&rev=512456&r1=512455&r2=512456
==============================================================================
--- incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/MessageExchangeContextImpl.java (original)
+++ incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/MessageExchangeContextImpl.java Tue Feb 27 14:30:33 2007
@@ -56,11 +56,7 @@
         if (__log.isDebugEnabled())
             __log.debug("Processing an async reply from service " + myRoleMessageExchange.getServiceName());
 
-        // TODO Add a port in MessageExchange (for now there's only service) to be able to find the
-        // TODO right service. For now we'll just lookup by service+portType but if we have severalt ports
-        // TODO for the same portType that will not work.
-        ODEService service = _server.getService(myRoleMessageExchange.getServiceName(),
-                myRoleMessageExchange.getPortType().getQName());
-        service.notifyResponse(myRoleMessageExchange);
+        // Nothing to do, no callback is necessary, the client just synchornizes itself with the
+        // mex reply when invoking the engine.
     }
 }

Modified: incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java?view=diff&rev=512456&r1=512455&r2=512456
==============================================================================
--- incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java (original)
+++ incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java Tue Feb 27 14:30:33 2007
@@ -19,19 +19,6 @@
 
 package org.apache.ode.axis2;
 
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.util.StringTokenizer;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import javax.servlet.ServletConfig;
-import javax.servlet.ServletException;
-import javax.sql.DataSource;
-import javax.transaction.TransactionManager;
-import javax.wsdl.Definition;
-import javax.xml.namespace.QName;
-
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.description.AxisOperation;
 import org.apache.axis2.description.AxisService;
@@ -58,6 +45,18 @@
 import org.apache.ode.store.ProcessStoreImpl;
 import org.apache.ode.utils.fs.TempFileManager;
 
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.sql.DataSource;
+import javax.transaction.TransactionManager;
+import javax.wsdl.Definition;
+import javax.xml.namespace.QName;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.StringTokenizer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
 /**
  * Server class called by our Axis hooks to handle all ODE lifecycle management.
  * 
@@ -445,7 +444,7 @@
 
         _server.setDaoConnectionFactory(_daoCF);
         _server.setEndpointReferenceContext(new EndpointReferenceContextImpl(this));
-        _server.setMessageExchangeContext(new P2PMexContextImpl(this, new MessageExchangeContextImpl(this), _scheduler));
+        _server.setMessageExchangeContext(new MessageExchangeContextImpl(this));
         _server.setBindingContext(new BindingContextImpl(this, _store));
         _server.setScheduler(_scheduler);
         if (_odeConfig.isDehydrationEnabled()) {
@@ -506,5 +505,4 @@
             __log.debug("Ignoring store event: " + pse);
         }
     }
-
 }

Modified: incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEService.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEService.java?view=diff&rev=512456&r1=512455&r2=512456
==============================================================================
--- incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEService.java (original)
+++ incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEService.java Tue Feb 27 14:30:33 2007
@@ -27,13 +27,15 @@
 import org.apache.axis2.description.AxisService;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.ode.axis2.util.OMUtils;
 import org.apache.ode.axis2.util.SoapMessageConverter;
 import org.apache.ode.bpel.epr.EndpointFactory;
 import org.apache.ode.bpel.epr.MutableEndpoint;
 import org.apache.ode.bpel.epr.WSAEndpoint;
-import org.apache.ode.bpel.iapi.*;
-import org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern;
+import org.apache.ode.bpel.iapi.BpelServer;
+import org.apache.ode.bpel.iapi.EndpointReference;
+import org.apache.ode.bpel.iapi.Message;
+import org.apache.ode.bpel.iapi.MessageExchange;
+import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
 import org.apache.ode.utils.DOMUtils;
 import org.apache.ode.utils.GUID;
 import org.apache.ode.utils.Namespaces;
@@ -47,9 +49,8 @@
 import javax.wsdl.extensions.UnknownExtensibilityElement;
 import javax.wsdl.extensions.soap.SOAPAddress;
 import javax.xml.namespace.QName;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 /**
  * A running service, encapsulates the Axis service, its receivers and our
@@ -67,7 +68,6 @@
     private Definition _wsdlDef;
     private QName _serviceName;
     private String _portName;
-    private Map<String, ResponseCallback> _waitingCallbacks;
     private WSAEndpoint _serviceRef;
     private boolean _isReplicateEmptyNS = false;
     private SoapMessageConverter _converter;
@@ -80,7 +80,6 @@
         _wsdlDef = def;
         _serviceName = serviceName;
         _portName = portName;
-        _waitingCallbacks = Collections.synchronizedMap(new HashMap<String, ResponseCallback>());
         _serviceRef = EndpointFactory.convertToWSA(createServiceRef(genEPRfromWSDL(_wsdlDef, serviceName, portName)));
         _converter = new SoapMessageConverter(OMAbstractFactory.getSOAP11Factory(), def, serviceName, portName,
                 _isReplicateEmptyNS);
@@ -91,7 +90,7 @@
             throws AxisFault {
         boolean success = true;
         MyRoleMessageExchange odeMex = null;
-        ResponseCallback callback = null;
+        Future responseFuture = null;
         try {
             _txManager.begin();
             if (__log.isDebugEnabled()) __log.debug("Starting transaction.");
@@ -111,18 +110,12 @@
                 readHeader(msgContext, odeMex);
                 odeRequest.setMessage(msgEl);
 
-                // Preparing a callback just in case we would need one.
-                if (odeMex.getOperation().getOutput() != null) {
-                    callback = new ResponseCallback();
-                    _waitingCallbacks.put(odeMex.getClientId(), callback);
-                }
-
                 if (__log.isDebugEnabled()) {
                     __log.debug("Invoking ODE using MEX " + odeMex);
                     __log.debug("Message content:  " + DOMUtils.domToString(odeRequest.getMessage()));
                 }
                 // Invoking ODE
-                odeMex.invoke(odeRequest);
+                responseFuture = odeMex.invoke(odeRequest);
             } else {
                 success = false;
             }
@@ -149,66 +142,61 @@
                     throw new OdeFault("Rollback failed", e);
                 }
             }
+        }
 
-            if (odeMex.getOperation() != null) {
-                boolean timeout = false;
-                // Invocation response could be delayed, if so we have to wait
-                // for it.
-                if (odeMex.getMessageExchangePattern() == MessageExchangePattern.REQUEST_RESPONSE &&
-                        odeMex.getStatus() == MessageExchange.Status.ASYNC) {
-                    odeMex = callback.getResponse(TIMEOUT);
-                    if (odeMex == null)
-                        timeout = true;
-                } else {
-                    // Callback wasn't necessary, cleaning up
-                    _waitingCallbacks.remove(odeMex.getMessageExchangeId());
-                }
+        if (odeMex.getOperation() != null) {
+            // Waits for the response to arrive
+            try {
+                responseFuture.get(TIMEOUT, TimeUnit.MILLISECONDS);
+            } catch (Exception e) {
+                String errorMsg = "Timeout or execution error when waiting for response to MEX "
+                        + odeMex + " " + e.toString();
+                __log.error(errorMsg);
+                __log.error(e);
+                throw new OdeFault(errorMsg);
+            }
 
-                if (outMsgContext != null) {
-                    SOAPEnvelope envelope = soapFactory.getDefaultEnvelope();
-                    outMsgContext.setEnvelope(envelope);
-
-                    // Hopefully we have a response
-                    __log.debug("Handling response for MEX " + odeMex);
-                    if (timeout) {
-                        __log.error("Timeout when waiting for response to MEX " + odeMex);
-                        success = false;
-                    } else {
-                        boolean commit = false;
+            if (outMsgContext != null) {
+                SOAPEnvelope envelope = soapFactory.getDefaultEnvelope();
+                outMsgContext.setEnvelope(envelope);
+
+                // Hopefully we have a response
+                __log.debug("Handling response for MEX " + odeMex);
+                boolean commit = false;
+                try {
+                    if (__log.isDebugEnabled()) __log.debug("Starting transaction.");
+                    _txManager.begin();
+                } catch (Exception ex) {
+                    throw new OdeFault("Error starting transaction!", ex);
+                }
+                try {
+                    // Refreshing the message exchange
+                    odeMex = (MyRoleMessageExchange) _server.getEngine()
+                            .getMessageExchange(odeMex.getMessageExchangeId());
+                    onResponse(odeMex, outMsgContext);
+                    commit = true;
+                } catch (AxisFault af) {
+                    __log.error("Error processing response for MEX " + odeMex, af);
+                    commit = true;
+                    throw af;
+                } catch (Exception e) {
+                    __log.error("Error processing response for MEX " + odeMex, e);
+                    throw new OdeFault("An exception occured when invoking ODE.", e);
+                } finally {
+                    if (commit)
                         try {
-                            if (__log.isDebugEnabled()) __log.debug("Starting transaction.");
-                            _txManager.begin();
-                        } catch (Exception ex) {
-                            throw new OdeFault("Error starting transaction!", ex);
+                            if (__log.isDebugEnabled()) __log.debug("Comitting transaction.");
+                            _txManager.commit();
+                        } catch (Exception e) {
+                            throw new OdeFault("Commit failed!", e);
                         }
+                    else
                         try {
-                            onResponse(odeMex, outMsgContext);
-                            commit = true;
-                        } catch (AxisFault af) {
-                            __log.error("Error processing response for MEX " + odeMex, af);
-                            commit = true;
-                            throw af;
-                        } catch (Exception e) {
-                            __log.error("Error processing response for MEX " + odeMex, e);
-                            throw new OdeFault("An exception occured when invoking ODE.", e);
-                        } finally {
-                            if (odeMex!= null) odeMex.release();
-                            else __log.warn("Couldn't release a message exchange, it's null.");
-                            if (commit)
-                                try {
-                                    if (__log.isDebugEnabled()) __log.debug("Comitting transaction.");
-                                    _txManager.commit();
-                                } catch (Exception e) {
-                                    throw new OdeFault("Commit failed!", e);
-                                }
-                            else
-                                try {
-                                    _txManager.rollback();
-                                } catch (Exception ex) {
-                                    throw new OdeFault("Rollback failed!", ex);
-                                }
+                            _txManager.rollback();
+                        } catch (Exception ex) {
+                            throw new OdeFault("Rollback failed!", ex);
                         }
-                    }
+
                 }
             }
         }
@@ -216,16 +204,6 @@
             throw new OdeFault("Message was either unroutable or timed out!");
     }
 
-    public void notifyResponse(MyRoleMessageExchange mex) {
-        ResponseCallback callback = _waitingCallbacks.get(mex.getClientId());
-        if (callback == null) {
-            __log.error("No active service for message exchange: " + mex);
-        } else {
-            callback.onResponse(mex);
-            _waitingCallbacks.remove(mex.getClientId());
-        }
-    }
-
     public boolean respondsTo(QName serviceName, QName portTypeName) {
         boolean result = _serviceName.equals(serviceName);
         result = result
@@ -308,35 +286,6 @@
 
     public AxisService getAxisService() {
         return _axisService;
-    }
-
-    static class ResponseCallback {
-        private MyRoleMessageExchange _mmex;
-
-        private boolean _timedout;
-
-        synchronized boolean onResponse(MyRoleMessageExchange mmex) {
-            if (_timedout) {
-                return false;
-            }
-            _mmex = mmex;
-            this.notify();
-            return true;
-        }
-
-        synchronized MyRoleMessageExchange getResponse(long timeout) {
-            long etime = timeout == 0 ? Long.MAX_VALUE : System.currentTimeMillis() + timeout;
-            long ctime;
-            try {
-                while (_mmex == null && (ctime = System.currentTimeMillis()) < etime) {
-                    this.wait(etime - ctime);
-                }
-            } catch (InterruptedException ie) {
-                // ignore
-            }
-            _timedout = _mmex == null;
-            return _mmex;
-        }
     }
 
     /**

Modified: incubator/ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MyRoleMessageExchange.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MyRoleMessageExchange.java?view=diff&rev=512456&r1=512455&r2=512456
==============================================================================
--- incubator/ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MyRoleMessageExchange.java (original)
+++ incubator/ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MyRoleMessageExchange.java Tue Feb 27 14:30:33 2007
@@ -20,6 +20,7 @@
 package org.apache.ode.bpel.iapi;
 
 import javax.xml.namespace.QName;
+import java.util.concurrent.Future;
 
 /**
  * Extension of the {@link org.apache.ode.bpel.iapi.MessageExchange} interface
@@ -67,7 +68,7 @@
      * {@link MessageExchangeContext#onAsyncReply(MyRoleMessageExchange)} when
      * the response become available.
      */
-    void invoke(Message request);
+    Future invoke(Message request);
 
     /**
      * Complete the message, exchange: indicates that the client has receive the
@@ -78,7 +79,7 @@
     /**
      * Associate a client key with this message exchange.
      * 
-     * @param bs
+     * @param clientKey
      */
     void setClientId(String clientKey);
 

Modified: incubator/ode/trunk/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java?view=diff&rev=512456&r1=512455&r2=512456
==============================================================================
--- incubator/ode/trunk/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java (original)
+++ incubator/ode/trunk/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java Tue Feb 27 14:30:33 2007
@@ -85,7 +85,12 @@
 
     /**
      * Set state of last message sent/received.
+<<<<<<< .mine
+     *
+     * @param status state to be set
+=======
      * @param string state to be set
+>>>>>>> .r511955
      */
     void setStatus(String status);
 
@@ -231,6 +236,14 @@
     void setPartnerLink(PartnerLinkDAO plinkDAO);
 
     PartnerLinkDAO getPartnerLink();
+
+    /**
+     * Gets the mex id for the message exchange that has been piped with
+     * this one in a process to process interaction. 
+     * @return
+     */
+    String getPipedMessageExchangeId();
+    void setPipedMessageExchangeId(String mexId);
 
     void release();
 

Modified: incubator/ode/trunk/bpel-runtime/pom.xml
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/pom.xml?view=diff&rev=512456&r1=512455&r2=512456
==============================================================================
--- incubator/ode/trunk/bpel-runtime/pom.xml (original)
+++ incubator/ode/trunk/bpel-runtime/pom.xml Tue Feb 27 14:30:33 2007
@@ -144,14 +144,8 @@
         </dependency>
         <dependency>
             <groupId>org.apache.ode</groupId>
-            <artifactId>ode-dao-jpa-ojpa</artifactId>
+            <artifactId>ode-dao-jpa</artifactId>
             <scope>test</scope>
-            <exclusions>
-                <exclusion>
-                    <artifactId>ode-dao-jpa</artifactId>
-                    <groupId>org.apache.ode</groupId>
-                </exclusion>
-            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.openjpa</groupId>

Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/elang/xpath20/runtime/JaxpVariableResolver.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/elang/xpath20/runtime/JaxpVariableResolver.java?view=diff&rev=512456&r1=512455&r2=512456
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/elang/xpath20/runtime/JaxpVariableResolver.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/elang/xpath20/runtime/JaxpVariableResolver.java Tue Feb 27 14:30:33 2007
@@ -19,9 +19,6 @@
 
 package org.apache.ode.bpel.elang.xpath20.runtime;
 
-import javax.xml.namespace.QName;
-import javax.xml.xpath.XPathVariableResolver;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ode.bpel.common.FaultException;
@@ -37,6 +34,9 @@
 import org.w3c.dom.Node;
 import org.w3c.dom.NodeList;
 
+import javax.xml.namespace.QName;
+import javax.xml.xpath.XPathVariableResolver;
+
 /**
  * @author mriou <mriou at apache dot org>
  */
@@ -98,7 +98,7 @@
                 if (part != null && part.type instanceof OXsdTypeVarType && ((OXsdTypeVarType)part.type).simple)
                 	return getSimpleContent(variableNode,((OXsdTypeVarType)part.type).xsdType);
 
-                
+
                 // Saxon expects a node list, this nodelist should contain exactly one item, the attribute
                 // value
                 return new SingletonNodeList(variableNode);

Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/elang/xpath20/runtime/XPath20ExpressionRuntime.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/elang/xpath20/runtime/XPath20ExpressionRuntime.java?view=diff&rev=512456&r1=512455&r2=512456
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/elang/xpath20/runtime/XPath20ExpressionRuntime.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/elang/xpath20/runtime/XPath20ExpressionRuntime.java Tue Feb 27 14:30:33 2007
@@ -36,14 +36,22 @@
 import org.apache.ode.utils.xsd.Duration;
 import org.apache.ode.utils.xsd.XMLCalendar;
 import org.apache.ode.utils.xsl.XslTransformHandler;
-import org.w3c.dom.*;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.w3c.dom.Text;
 
 import javax.xml.namespace.QName;
 import javax.xml.transform.TransformerFactory;
 import javax.xml.xpath.XPathConstants;
 import javax.xml.xpath.XPathExpression;
 import javax.xml.xpath.XPathExpressionException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 
 /**
  * XPath 2.0 Expression Language run-time subsytem.
@@ -94,6 +102,7 @@
         Object someRes = evaluate(cexp, ctx, XPathConstants.NODESET);
         if (someRes instanceof List) {
             result = (List) someRes;
+            __log.debug("Returned list of size " + result.size());
             if ((result.size() == 1) && !(result.get(0) instanceof Node)) {
               Document d = DOMUtils.newDocument();
               // Giving our node a parent just in case it's an LValue expression
@@ -105,6 +114,7 @@
             }
         } else if (someRes instanceof NodeList) {
             NodeList retVal = (NodeList) someRes;
+            __log.debug("Returned node list of size " + retVal.getLength());
             result = new ArrayList(retVal.getLength());
             for(int m = 0; m < retVal.getLength(); ++m) {
                 Node val = retVal.item(m);

Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java?view=diff&rev=512456&r1=512455&r2=512456
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java Tue Feb 27 14:30:33 2007
@@ -25,10 +25,17 @@
 import org.apache.ode.bpel.dao.ProcessDAO;
 import org.apache.ode.bpel.dao.ProcessInstanceDAO;
 import org.apache.ode.bpel.evt.BpelEvent;
-import org.apache.ode.bpel.iapi.*;
+import org.apache.ode.bpel.iapi.BpelEngine;
+import org.apache.ode.bpel.iapi.BpelEngineException;
+import org.apache.ode.bpel.iapi.ContextException;
+import org.apache.ode.bpel.iapi.Endpoint;
+import org.apache.ode.bpel.iapi.Message;
+import org.apache.ode.bpel.iapi.MessageExchange;
 import org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern;
 import org.apache.ode.bpel.iapi.MessageExchange.Status;
+import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
 import org.apache.ode.bpel.iapi.MyRoleMessageExchange.CorrelationStatus;
+import org.apache.ode.bpel.iapi.Scheduler;
 import org.apache.ode.bpel.iapi.Scheduler.JobInfo;
 import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
 import org.apache.ode.bpel.o.OPartnerLink;
@@ -99,7 +106,8 @@
         _contexts = contexts;
     }
 
-    public MyRoleMessageExchange createMessageExchange(String clientKey, QName targetService, String operation)
+    public MyRoleMessageExchange createMessageExchange(String clientKey, QName targetService,
+                                                       String operation, String pipedMexId)
             throws BpelEngineException {
 
         BpelProcess target = route(targetService, null);
@@ -116,6 +124,7 @@
         dao.setCallee(targetService);
         dao.setStatus(Status.NEW.toString());
         dao.setOperation(operation);
+        dao.setPipedMessageExchangeId(pipedMexId);
         MyRoleMessageExchangeImpl mex = new MyRoleMessageExchangeImpl(this, dao);
 
         if (target != null) {
@@ -125,6 +134,10 @@
         return mex;
     }
 
+    public MyRoleMessageExchange createMessageExchange(String clientKey, QName targetService, String operation) {
+        return createMessageExchange(clientKey, targetService, operation, null);        
+    }
+
     public MessageExchange getMessageExchange(String mexId) throws BpelEngineException {
         MessageExchangeDAO mexdao = _contexts.inMemDao.getConnection().getMessageExchange(mexId);
         if (mexdao == null) mexdao = _contexts.dao.getConnection().getMessageExchange(mexId);
@@ -267,29 +280,31 @@
         // to a grinding halt.
         try {
 
-            ProcessInstanceDAO instance;
-            if (we.isInMem())
-                instance = _contexts.inMemDao.getConnection().getInstance(we.getIID());
-            else
-                instance = _contexts.dao.getConnection().getInstance(we.getIID());
-
-            if (instance == null) {
-                __log.error(__msgs.msgScheduledJobReferencesUnknownInstance(we.getIID()));
-                // nothing we can do, this instance is not in the database, it will
-                // always
-                // fail.
-                return;
+            BpelProcess process;
+            if (we.getProcessId() != null) {
+                process = _activeProcesses.get(we.getProcessId());
+            } else {
+                ProcessInstanceDAO instance;
+                if (we.isInMem()) instance = _contexts.inMemDao.getConnection().getInstance(we.getIID());
+                else instance = _contexts.dao.getConnection().getInstance(we.getIID());
+
+                if (instance == null) {
+                    __log.error(__msgs.msgScheduledJobReferencesUnknownInstance(we.getIID()));
+                    // nothing we can do, this instance is not in the database, it will
+                    // always
+                    // fail.
+                    return;
+                }
+                ProcessDAO processDao = instance.getProcess();
+                process = _activeProcesses.get(processDao.getProcessId());
             }
 
-            ProcessDAO processDao = instance.getProcess();
-            BpelProcess process = _activeProcesses.get(processDao.getProcessId());
             if (process == null) {
                 // If the process is not active, it means that we should not be
-                // doing
-                // any work on its behalf, therefore we will reschedule the events
-                // for some time in the future (1 minute).
+                // doing any work on its behalf, therefore we will reschedule the
+                // events for some time in the future (1 minute).
                 Date future = new Date(System.currentTimeMillis() + (60 * 1000));
-                __log.info(__msgs.msgReschedulingJobForInactiveProcess(processDao.getProcessId(), jobInfo.jobName, future));
+                __log.info(__msgs.msgReschedulingJobForInactiveProcess(process.getPID(), jobInfo.jobName, future));
                 _contexts.scheduler.schedulePersistedJob(jobInfo.jobDetail, future);
                 return;
             }

Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?view=diff&rev=512456&r1=512455&r2=512456
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java Tue Feb 27 14:30:33 2007
@@ -27,10 +27,20 @@
 import org.apache.ode.bpel.evt.ScopeEvent;
 import org.apache.ode.bpel.explang.ConfigurationException;
 import org.apache.ode.bpel.explang.EvaluationException;
-import org.apache.ode.bpel.iapi.*;
+import org.apache.ode.bpel.iapi.BpelEngineException;
+import org.apache.ode.bpel.iapi.Endpoint;
+import org.apache.ode.bpel.iapi.EndpointReference;
+import org.apache.ode.bpel.iapi.MessageExchange;
+import org.apache.ode.bpel.iapi.PartnerRoleChannel;
+import org.apache.ode.bpel.iapi.ProcessConf;
 import org.apache.ode.bpel.intercept.InterceptorInvoker;
 import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
-import org.apache.ode.bpel.o.*;
+import org.apache.ode.bpel.o.OElementVarType;
+import org.apache.ode.bpel.o.OExpressionLanguage;
+import org.apache.ode.bpel.o.OMessageVarType;
+import org.apache.ode.bpel.o.OPartnerLink;
+import org.apache.ode.bpel.o.OProcess;
+import org.apache.ode.bpel.o.Serializer;
 import org.apache.ode.bpel.runtime.ExpressionLanguageRuntimeRegistry;
 import org.apache.ode.bpel.runtime.PROCESS;
 import org.apache.ode.bpel.runtime.PropertyAliasEvaluationContext;
@@ -45,7 +55,12 @@
 
 import javax.xml.namespace.QName;
 import java.io.InputStream;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 /**
  * Entry point into the runtime of a BPEL process.
@@ -119,7 +134,6 @@
      * @param mex
      */
     void invokeProcess(MyRoleMessageExchangeImpl mex) {
-        markused();
         PartnerLinkMyRoleImpl target = getMyRoleForService(mex.getServiceName());
         if (target == null) {
             String errmsg = __msgs.msgMyRoleRoutingFailure(mex.getMessageExchangeId());
@@ -250,52 +264,57 @@
      */
     public void handleWorkEvent(Map<String, Object> jobData) {
         markused();
-        ProcessInstanceDAO procInstance;
 
         if (__log.isDebugEnabled()) {
             __log.debug(ObjectPrinter.stringifyMethodEnter("handleWorkEvent", new Object[] { "jobData", jobData }));
         }
 
         WorkEvent we = new WorkEvent(jobData);
-        procInstance = getProcessDAO().getInstance(we.getIID());
-        if (procInstance == null) {
-            if (__log.isDebugEnabled()) {
-                __log.debug("handleWorkEvent: no ProcessInstance found with iid " + we.getIID() + "; ignoring.");
-            }
-            return;
-        }
 
-        BpelRuntimeContextImpl processInstance = createRuntimeContext(procInstance, null, null);
-        switch (we.getType()) {
-        case TIMER:
+        // Process level events
+        if (we.getType().equals(WorkEvent.Type.INVOKE_INTERNAL)) {
             if (__log.isDebugEnabled()) {
-                __log.debug("handleWorkEvent: TimerWork event for process instance " + processInstance);
+                __log.debug("InvokeInternal event for mexid " + we.getMexId());
             }
-
-            processInstance.timerEvent(we.getChannel());
-            break;
-        case RESUME:
-            if (__log.isDebugEnabled()) {
-                __log.debug("handleWorkEvent: ResumeWork event for iid " + we.getIID());
-            }
-
-            processInstance.execute();
-
-            break;
-        case INVOKE_RESPONSE:
-            if (__log.isDebugEnabled()) {
-                __log.debug("InvokeResponse event for iid " + we.getIID());
-            }
-
-            processInstance.invocationResponse(we.getMexId(), we.getChannel());
-            processInstance.execute();
-            break;
-        case MATCHER:
-            if (__log.isDebugEnabled()) {
-                __log.debug("Matcher event for iid " + we.getIID());
+            MyRoleMessageExchangeImpl mex = (MyRoleMessageExchangeImpl) getEngine().getMessageExchange(we.getMexId());
+            invokeProcess(mex);
+        } else {
+            // Instance level events
+            ProcessInstanceDAO procInstance = getProcessDAO().getInstance(we.getIID());
+            if (procInstance == null) {
+                if (__log.isDebugEnabled()) {
+                    __log.debug("handleWorkEvent: no ProcessInstance found with iid " + we.getIID() + "; ignoring.");
+                }
+                return;
+            }
+
+            BpelRuntimeContextImpl processInstance = createRuntimeContext(procInstance, null, null);
+            switch (we.getType()) {
+            case TIMER:
+                if (__log.isDebugEnabled()) {
+                    __log.debug("handleWorkEvent: TimerWork event for process instance " + processInstance);
+                }
+                processInstance.timerEvent(we.getChannel());
+                break;
+            case RESUME:
+                if (__log.isDebugEnabled()) {
+                    __log.debug("handleWorkEvent: ResumeWork event for iid " + we.getIID());
+                }
+                processInstance.execute();
+                break;
+            case INVOKE_RESPONSE:
+                if (__log.isDebugEnabled()) {
+                    __log.debug("InvokeResponse event for iid " + we.getIID());
+                }
+                processInstance.invocationResponse(we.getMexId(), we.getChannel());
+                processInstance.execute();
+                break;
+            case MATCHER:
+                if (__log.isDebugEnabled()) {
+                    __log.debug("Matcher event for iid " + we.getIID());
+                }
+                processInstance.matcherEvent(we.getCorrelatorId(), we.getCorrelationKey());
             }
-
-            processInstance.matcherEvent(we.getCorrelatorId(), we.getCorrelationKey());
         }
     }
 
@@ -414,6 +433,13 @@
         if (prole == null)
             throw new IllegalStateException("Unknown partner link " + link);
         return prole.getInitialEPR();
+    }
+
+    Endpoint getInitialPartnerRoleEndpoint(OPartnerLink link) {
+        PartnerLinkPartnerRoleImpl prole = getPartnerRoles().get(link);
+        if (prole == null)
+            throw new IllegalStateException("Unknown partner link " + link);
+        return prole._initialPartner;
     }
 
     EndpointReference getInitialMyRoleEPR(OPartnerLink link) {

Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?view=diff&rev=512456&r1=512455&r2=512456
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java Tue Feb 27 14:30:33 2007
@@ -23,18 +23,50 @@
 import org.apache.ode.bpel.common.CorrelationKey;
 import org.apache.ode.bpel.common.FaultException;
 import org.apache.ode.bpel.common.ProcessState;
-import org.apache.ode.bpel.dao.*;
-import org.apache.ode.bpel.evt.*;
-import org.apache.ode.bpel.iapi.*;
+import org.apache.ode.bpel.dao.CorrelationSetDAO;
+import org.apache.ode.bpel.dao.CorrelatorDAO;
+import org.apache.ode.bpel.dao.MessageDAO;
+import org.apache.ode.bpel.dao.MessageExchangeDAO;
+import org.apache.ode.bpel.dao.MessageRouteDAO;
+import org.apache.ode.bpel.dao.PartnerLinkDAO;
+import org.apache.ode.bpel.dao.ProcessDAO;
+import org.apache.ode.bpel.dao.ProcessInstanceDAO;
+import org.apache.ode.bpel.dao.ScopeDAO;
+import org.apache.ode.bpel.dao.XmlDataDAO;
+import org.apache.ode.bpel.evt.CorrelationSetWriteEvent;
+import org.apache.ode.bpel.evt.ProcessCompletionEvent;
+import org.apache.ode.bpel.evt.ProcessInstanceEvent;
+import org.apache.ode.bpel.evt.ProcessInstanceStateChangeEvent;
+import org.apache.ode.bpel.evt.ProcessMessageExchangeEvent;
+import org.apache.ode.bpel.evt.ProcessTerminationEvent;
+import org.apache.ode.bpel.iapi.BpelEngineException;
+import org.apache.ode.bpel.iapi.ContextException;
+import org.apache.ode.bpel.iapi.Endpoint;
+import org.apache.ode.bpel.iapi.EndpointReference;
+import org.apache.ode.bpel.iapi.Message;
+import org.apache.ode.bpel.iapi.MessageExchange;
 import org.apache.ode.bpel.iapi.MessageExchange.FailureType;
 import org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern;
+import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
+import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
 import org.apache.ode.bpel.o.OMessageVarType;
 import org.apache.ode.bpel.o.OMessageVarType.Part;
 import org.apache.ode.bpel.o.OPartnerLink;
 import org.apache.ode.bpel.o.OProcess;
 import org.apache.ode.bpel.o.OScope;
-import org.apache.ode.bpel.runtime.*;
-import org.apache.ode.bpel.runtime.channels.*;
+import org.apache.ode.bpel.runtime.BpelJacobRunnable;
+import org.apache.ode.bpel.runtime.BpelRuntimeContext;
+import org.apache.ode.bpel.runtime.CorrelationSetInstance;
+import org.apache.ode.bpel.runtime.ExpressionLanguageRuntimeRegistry;
+import org.apache.ode.bpel.runtime.PROCESS;
+import org.apache.ode.bpel.runtime.PartnerLinkInstance;
+import org.apache.ode.bpel.runtime.Selector;
+import org.apache.ode.bpel.runtime.VariableInstance;
+import org.apache.ode.bpel.runtime.channels.ActivityRecoveryChannel;
+import org.apache.ode.bpel.runtime.channels.FaultData;
+import org.apache.ode.bpel.runtime.channels.InvokeResponseChannel;
+import org.apache.ode.bpel.runtime.channels.PickResponseChannel;
+import org.apache.ode.bpel.runtime.channels.TimerResponseChannel;
 import org.apache.ode.jacob.JacobRunnable;
 import org.apache.ode.jacob.vpu.ExecutionQueueImpl;
 import org.apache.ode.jacob.vpu.JacobVPU;
@@ -485,12 +517,14 @@
 
         MessageExchangeDAO mex = _dao.getConnection().getMessageExchange(mexRef);
 
-        MessageExchange.Status prevStatus = MessageExchange.Status.valueOf(mex.getStatus());
-
         MessageDAO message = mex.createMessage(plinkInstnace.partnerLink.getMyRoleOperation(opName).getOutput()
                 .getMessage().getQName());
         message.setData(msg);
-        mex.setResponse(message);
+
+        MyRoleMessageExchangeImpl m = new MyRoleMessageExchangeImpl(_bpelProcess._engine, mex);
+        _bpelProcess.initMyRoleMex(m);
+        m.setResponse(new MessageImpl(message));
+
         if (fault != null) {
             mex.setStatus(MessageExchange.Status.FAULT.toString());
             mex.setFault(fault);
@@ -499,15 +533,35 @@
             mex.setStatus(MessageExchange.Status.RESPONSE.toString());
             evt.setAspect(ProcessMessageExchangeEvent.PROCESS_OUTPUT);
         }
-        MyRoleMessageExchangeImpl m = new MyRoleMessageExchangeImpl(_bpelProcess._engine, mex);
-        _bpelProcess.initMyRoleMex(m);
 
-        if (prevStatus == MessageExchange.Status.ASYNC)
-            _bpelProcess._engine._contexts.mexContext.onAsyncReply(m);
+        if (mex.getPipedMessageExchangeId() != null) {
+            PartnerRoleMessageExchange pmex = (PartnerRoleMessageExchange) _bpelProcess
+                    .getEngine().getMessageExchange(mex.getPipedMessageExchangeId());
+            __log.debug("Replying to a p2p mex, myrole " + m + " - partnerole " + pmex);
+            switch (m.getStatus()) {
+                case FAILURE:
+                    // We can't seem to get the failure out of the myrole mex?
+                    pmex.replyWithFailure(MessageExchange.FailureType.OTHER, "operation failed", null);
+                    break;
+                case FAULT:
+                    Message faultRes = pmex.createMessage(pmex.getOperation().getFault(m.getFault().getLocalPart())
+                            .getMessage().getQName());
+                    faultRes.setMessage(m.getResponse().getMessage());
+                    pmex.replyWithFault(m.getFault(), faultRes);
+                    break;
+                case RESPONSE:
+                    Message response = pmex.createMessage(pmex.getOperation().getOutput().getMessage().getQName());
+                    response.setMessage(m.getResponse().getMessage());
+                    pmex.reply(response);
+                    break;
+                default:
+                    __log.debug("Unexpected state: " + m.getStatus());
+                    break;
+            }
+        } else _bpelProcess._engine._contexts.mexContext.onAsyncReply(m);
 
         // send event
         sendEvent(evt);
-
     }
 
     /**
@@ -616,15 +670,15 @@
         // initialized
         // then use the value from bthe deployment descriptor ..
         Element partnerEPR = plinkDAO.getPartnerEPR();
-        EndpointReference partnerEndpoint;
+        EndpointReference partnerEpr;
 
         if (partnerEPR == null) {
-            partnerEndpoint = _bpelProcess.getInitialPartnerRoleEPR(partnerLink.partnerLink);
+            partnerEpr = _bpelProcess.getInitialPartnerRoleEPR(partnerLink.partnerLink);
             // In this case, the partner link has not been initialized.
-            if (partnerEndpoint == null)
+            if (partnerEpr == null)
                 throw new FaultException(partnerLink.partnerLink.getOwner().constants.qnUninitializedPartnerRole);
         } else {
-            partnerEndpoint = _bpelProcess._engine._contexts.eprContext.resolveEndpointReference(partnerEPR);
+            partnerEpr = _bpelProcess._engine._contexts.eprContext.resolveEndpointReference(partnerEPR);
         }
 
         if (BpelProcess.__log.isDebugEnabled()) {
@@ -640,7 +694,6 @@
 
         MessageExchangeDAO mexDao = _dao.getConnection().createMessageExchange(
                 MessageExchangeDAO.DIR_BPEL_INVOKES_PARTNERROLE);
-
         mexDao.setStatus(MessageExchange.Status.NEW.toString());
         mexDao.setOperation(operation.getName());
         mexDao.setPortType(partnerLink.partnerLink.partnerRolePortType.getQName());
@@ -656,14 +709,11 @@
         String mySessionId = plinkDAO.getMySessionId();
         String partnerSessionId = plinkDAO.getPartnerSessionId();
 
-//        mexDao.setProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID, mySessionId);
-//        mexDao.setProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID, partnerSessionId);
         if ( mySessionId != null )
            	mexDao.setProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID, mySessionId);
         if ( partnerSessionId != null )
            	mexDao.setProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID, partnerSessionId);
 
-
         if (__log.isDebugEnabled())
             __log.debug("INVOKE PARTNER (SEP): sessionId=" + mySessionId + " partnerSessionId=" + partnerSessionId);
 
@@ -673,26 +723,50 @@
         message.setType(operation.getInput().getMessage().getQName());
 
         // Get he my-role EPR (if myrole exists) for optional use by partner
-        // (for callback
-        // mechanism).
+        // (for callback mechanism).
         EndpointReference myRoleEndpoint = partnerLink.partnerLink.hasMyRole() ? _bpelProcess
                 .getInitialMyRoleEPR(partnerLink.partnerLink) : null;
-
         PartnerRoleMessageExchangeImpl mex = new PartnerRoleMessageExchangeImpl(_bpelProcess._engine, mexDao,
-                partnerLink.partnerLink.partnerRolePortType, operation, partnerEndpoint, myRoleEndpoint, _bpelProcess
+                partnerLink.partnerLink.partnerRolePortType, operation, partnerEpr, myRoleEndpoint, _bpelProcess
                         .getPartnerRoleChannel(partnerLink.partnerLink));
-        _bpelProcess.getPartnerRoleChannel(partnerLink.partnerLink);
 
-        // If we couldn't find the endpoint, then there is no sense
-        // in asking the IL to invoke.
-        if (partnerEndpoint != null) {
-            mexDao.setEPR(partnerEndpoint.toXML().getDocumentElement());
+        Endpoint partnerEndpoint = _bpelProcess.getInitialPartnerRoleEndpoint(partnerLink.partnerLink);
+        BpelProcess p2pProcess = _bpelProcess.getEngine().route(partnerEndpoint.serviceName, mex.getRequest());
+        if (p2pProcess != null) {
+            // Creating a my mex using the same message id as partner mex to "pipe" them
+            MyRoleMessageExchange myRoleMex = _bpelProcess.getEngine().createMessageExchange(
+                    mex.getMessageExchangeId(), partnerEndpoint.serviceName,
+                    operation.getName(), mex.getMessageExchangeId());
+
+            __log.debug("Invoking in a p2p interaction, partnerrole " + mex + " - myrole " + myRoleMex);
+            Message odeRequest = myRoleMex.createMessage(operation.getInput().getMessage().getQName());
+            odeRequest.setMessage(outgoingMessage);
+
+            __log.debug("Setting session ids for p2p interaction, mySession "
+                    + partnerSessionId + " - partnerSess " + mySessionId);
+            if ( partnerSessionId != null )
+                   myRoleMex.setProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID, partnerSessionId);
+            if ( mySessionId != null )
+                   myRoleMex.setProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID, mySessionId);
+
             mex.setStatus(MessageExchange.Status.REQUEST);
-            _bpelProcess._engine._contexts.mexContext.invokePartner(mex);
+            myRoleMex.invoke(odeRequest);
+
+            // Can't expect any sync response
+            mex.replyAsync();
         } else {
-            __log.error("Couldn't find endpoint for partner EPR " + DOMUtils.domToString(partnerEPR));
-            mex.setFailure(FailureType.UNKNOWN_ENDPOINT, "UnknownEndpoint", partnerEPR);
+            // If we couldn't find the endpoint, then there is no sense
+            // in asking the IL to invoke.
+            if (partnerEpr != null) {
+                mexDao.setEPR(partnerEpr.toXML().getDocumentElement());
+                mex.setStatus(MessageExchange.Status.REQUEST);
+                _bpelProcess._engine._contexts.mexContext.invokePartner(mex);
+            } else {
+                __log.error("Couldn't find endpoint for partner EPR " + DOMUtils.domToString(partnerEPR));
+                mex.setFailure(FailureType.UNKNOWN_ENDPOINT, "UnknownEndpoint", partnerEPR);
+            }
         }
+
         evt.setMexId(mexDao.getMessageExchangeId());
         sendEvent(evt);
 

Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/InstanceLockManager.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/InstanceLockManager.java?view=diff&rev=512456&r1=512455&r2=512456
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/InstanceLockManager.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/InstanceLockManager.java Tue Feb 27 14:30:33 2007
@@ -18,15 +18,15 @@
  */
 package org.apache.ode.bpel.engine;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
 /**
  * 
  * WARNING --- EXPERIMENTAL
@@ -43,6 +43,8 @@
     private final Map<Long, InstanceInfo> _locks = new HashMap<Long,InstanceInfo> ();
     
     public void lock(Long iid, int time, TimeUnit tu) throws InterruptedException, TimeoutException {
+        if (iid == null) return;
+
         String thrd = Thread.currentThread().toString();
         if (__log.isDebugEnabled())
             __log.debug(thrd + ": lock(iid=" + iid + ", time=" + time + tu+")");
@@ -79,6 +81,8 @@
     }
     
     public void unlock(Long iid)  {
+        if (iid == null) return;
+
         String thrd = Thread.currentThread().toString();
         if (__log.isDebugEnabled())
             __log.debug(thrd + ": unlock(iid=" + iid + ")");        

Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java?view=diff&rev=512456&r1=512455&r2=512456
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java Tue Feb 27 14:30:33 2007
@@ -157,7 +157,7 @@
         _operation = operation;
     }
 
-    protected MessageExchangeDAO getDAO() {
+    MessageExchangeDAO getDAO() {
         return _dao;
     }
 
@@ -178,6 +178,9 @@
         setStatus(Status.RESPONSE);
         getDAO().setFault(null);
         getDAO().setResponse(((MessageImpl)outputMessage)._dao);
+
+        // Meant to be overriden by subclasses when needed
+        responseReceived();
     }
 
     void setFailure(FailureType type, String reason, Element details) throws BpelEngineException {
@@ -240,4 +243,7 @@
         return "MEX["+getDAO().getMessageExchangeId() +"]";
     }
 
+    protected void responseReceived() {
+        // Nothing to do here, just opening the possibility of overriding
+    }
 }

Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java?view=diff&rev=512456&r1=512455&r2=512456
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java Tue Feb 27 14:30:33 2007
@@ -25,6 +25,7 @@
 import org.apache.ode.bpel.iapi.Message;
 import org.apache.ode.bpel.iapi.MessageExchange;
 import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
+import org.apache.ode.bpel.iapi.Scheduler;
 import org.apache.ode.bpel.intercept.AbortMessageExchangeException;
 import org.apache.ode.bpel.intercept.FaultMessageExchangeException;
 import org.apache.ode.bpel.intercept.InterceptorInvoker;
@@ -32,10 +33,21 @@
 import org.apache.ode.bpel.intercept.MessageExchangeInterceptor.InterceptorContext;
 
 import javax.xml.namespace.QName;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 class MyRoleMessageExchangeImpl extends MessageExchangeImpl implements MyRoleMessageExchange {
 
     private static final Log __log = LogFactory.getLog(MyRoleMessageExchangeImpl.class);
+    public static final int TIMEOUT = 2 * 60 * 1000;
+
+    private static Map<String, ResponseCallback> _waitingCallbacks =
+            new ConcurrentHashMap<String, ResponseCallback>();
+
 
     public MyRoleMessageExchangeImpl(BpelEngineImpl engine, MessageExchangeDAO mexdao) {
         super(engine, mexdao);
@@ -86,7 +98,7 @@
         return true;
     }
 
-    public void invoke(Message request) {
+    public Future invoke(Message request) {
         if (request == null) {
             String errmsg = "Must pass non-null message to invoke()!";
             __log.fatal(errmsg);
@@ -97,7 +109,7 @@
         _dao.setStatus(MessageExchange.Status.REQUEST.toString());
 
         if (!processInterceptors(this, InterceptorInvoker.__onBpelServerInvoked))
-            return;
+            return null;
 
         BpelProcess target = _engine.route(getDAO().getCallee(), request);
 
@@ -110,10 +122,22 @@
 
             setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.UKNOWN_ENDPOINT);
             setFailure(MessageExchange.FailureType.UNKNOWN_ENDPOINT, null, null);
+            return null;
         } else {
-            target.invokeProcess(this);
+            // Schedule a new job for invocation
+            WorkEvent we = new WorkEvent();
+            we.setType(WorkEvent.Type.INVOKE_INTERNAL);
+            if (target.isInMemory()) we.setInMem(true);
+            we.setProcessId(target.getPID());
+            we.setMexId(getDAO().getMessageExchangeId());
+
+            ResponseCallback callback = new ResponseCallback();
+            _waitingCallbacks.put(getClientId(), callback);
+
+            setStatus(Status.ASYNC);
+            _engine._contexts.scheduler.schedulePersistedJob(we.getDetail(), null);
+            return new ResponseFuture(getClientId());
         }
-
     }
 
     public void complete() {
@@ -142,6 +166,83 @@
 
     public boolean isAsynchronous() {
         return true;
+    }
+
+    static class ResponseFuture implements Future {
+        private String _clientId;
+        private boolean _done = false;
+
+        public ResponseFuture(String clientId) {
+            _clientId = clientId;
+        }
+
+        public boolean cancel(boolean mayInterruptIfRunning) {
+            throw new UnsupportedOperationException();
+        }
+        public Object get() throws InterruptedException, ExecutionException {
+            try {
+                return get(0, TimeUnit.MILLISECONDS);
+            } catch (TimeoutException e) {
+                // If it's thrown it's definitely a bug
+                throw new ExecutionException(e);
+            }
+        }
+        public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+            ResponseCallback callback = _waitingCallbacks.get(_clientId);
+            if (callback != null) {
+                callback.waitResponse(timeout);
+                _done = true;
+                if (callback._timedout)
+                    throw new TimeoutException("Message exchange " + this + " timed out when waiting for a response!");
+            }
+            return null;
+        }
+        public boolean isCancelled() {
+            return false;
+        }
+        public boolean isDone() {
+            return _done;
+        }
+    }
+
+    protected void responseReceived() {
+        final String cid = getClientId();
+        _engine._contexts.scheduler.registerSynchronizer(new Scheduler.Synchronizer() {
+            public void afterCompletion(boolean success) {
+                __log.debug("Received myrole mex response callback");
+                ResponseCallback callback = _waitingCallbacks.remove(cid);
+                if (callback != null) callback.responseReceived();
+            }
+            public void beforeCompletion() {
+            }
+        });
+    }
+
+    static class ResponseCallback {
+        private boolean _timedout;
+        private boolean _waiting = true;
+
+        synchronized boolean responseReceived() {
+            if (_timedout) {
+                return false;
+            }
+            _waiting = false;
+            this.notify();
+            return true;
+        }
+
+        synchronized void waitResponse(long timeout) {
+            long etime = timeout == 0 ? Long.MAX_VALUE : System.currentTimeMillis() + timeout;
+            long ctime;
+            try {
+                while (_waiting && (ctime = System.currentTimeMillis()) < etime) {
+                    this.wait(etime - ctime);
+                }
+            } catch (InterruptedException ie) {
+                // ignore
+            }
+            _timedout = _waiting;
+        }
     }
 
 }

Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java?view=diff&rev=512456&r1=512455&r2=512456
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java Tue Feb 27 14:30:33 2007
@@ -21,6 +21,7 @@
 
 import org.apache.ode.bpel.common.CorrelationKey;
 
+import javax.xml.namespace.QName;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -59,7 +60,7 @@
     }
 
     public enum Type {
-        TIMER, RESUME, INVOKE_RESPONSE, MATCHER
+        TIMER, RESUME, INVOKE_RESPONSE, MATCHER, INVOKE_INTERNAL
     }
 
     public String getChannel() {
@@ -108,6 +109,14 @@
         Boolean bool = (Boolean) _jobDetail.get("inmem");
         if (bool == null) return false;
         else return bool;
+    }
+
+    public void setProcessId(QName pid) {
+        _jobDetail.put("pid", pid.toString());
+    }
+
+    public QName getProcessId() {
+        return _jobDetail.get("pid") != null? QName.valueOf((String) _jobDetail.get("pid")) : null;
     }
 }
 

Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java?view=diff&rev=512456&r1=512455&r2=512456
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java Tue Feb 27 14:30:33 2007
@@ -39,7 +39,8 @@
 	private QName callee;
 	private Properties properties = new Properties();
     private PartnerLinkDAOImpl _plink;
-	
+    private String pipedMessageExchangeId;
+
 	public MessageExchangeDAOImpl(char direction, String messageEchangeId){
 		this.direction = direction;
 		this.messageExchangeId = messageEchangeId;
@@ -241,6 +242,14 @@
             retVal.add((String)e.getKey());
         }
         return retVal;
+    }
+
+    public String getPipedMessageExchangeId() {
+        return pipedMessageExchangeId;
+    }
+
+    public void setPipedMessageExchangeId(String pipedMessageExchangeId) {
+        this.pipedMessageExchangeId = pipedMessageExchangeId;
     }
 
     public void release() {

Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/XmlDataDaoImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/XmlDataDaoImpl.java?view=diff&rev=512456&r1=512455&r2=512456
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/XmlDataDaoImpl.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/XmlDataDaoImpl.java Tue Feb 27 14:30:33 2007
@@ -8,13 +8,13 @@
 import org.apache.ode.bpel.dao.ScopeDAO;
 import org.apache.ode.bpel.dao.XmlDataDAO;
 import org.apache.ode.utils.DOMUtils;
-import org.w3c.dom.Node;
-import org.w3c.dom.Element;
 import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
 import org.xml.sax.SAXException;
 
-import java.util.Properties;
 import java.io.IOException;
+import java.util.Properties;
 
 
 /**

Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/INVOKE.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/INVOKE.java?view=diff&rev=512456&r1=512455&r2=512456
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/INVOKE.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/INVOKE.java Tue Feb 27 14:30:33 2007
@@ -18,11 +18,6 @@
  */
 package org.apache.ode.bpel.runtime;
 
-import java.util.Collection;
-import java.util.Date;
-
-import javax.xml.namespace.QName;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ode.bpel.common.FaultException;
@@ -42,6 +37,10 @@
 import org.w3c.dom.Element;
 import org.w3c.dom.Node;
 
+import javax.xml.namespace.QName;
+import java.util.Collection;
+import java.util.Date;
+
 /**
  * JacobRunnable that performs the work of the <code>invoke</code> activity.
  */
@@ -112,7 +111,7 @@
                             // TODO: Better error handling
                             throw new RuntimeException(e);
                         }
-                       
+
                         getBpelRuntimeContext().initializeVariable(outputVar, response);
 
                         try {

Modified: incubator/ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java?view=diff&rev=512456&r1=512455&r2=512456
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java Tue Feb 27 14:30:33 2007
@@ -18,22 +18,6 @@
  */
 package org.apache.ode.bpel.runtime;
 
-import java.io.File;
-import java.sql.DriverManager;
-import java.util.Collection;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import javax.sql.DataSource;
-import javax.transaction.TransactionManager;
-import javax.wsdl.PortType;
-import javax.xml.namespace.QName;
-
 import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
 import org.apache.ode.bpel.engine.BpelServerImpl;
 import org.apache.ode.bpel.iapi.BindingContext;
@@ -48,7 +32,6 @@
 import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
 import org.apache.ode.bpel.iapi.Scheduler;
 import org.apache.ode.bpel.scheduler.quartz.QuartzSchedulerImpl;
-import org.apache.ode.dao.jpa.ojpa.BPELDAOConnectionFactoryImpl;
 import org.apache.ode.store.ProcessStoreImpl;
 import org.apache.ode.utils.DOMUtils;
 import org.apache.ode.utils.GUID;
@@ -57,6 +40,21 @@
 import org.opentools.minerva.MinervaPool;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
+
+import javax.sql.DataSource;
+import javax.transaction.TransactionManager;
+import javax.wsdl.PortType;
+import javax.xml.namespace.QName;
+import java.io.File;
+import java.sql.DriverManager;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 
 class MockBpelServer {

Modified: incubator/ode/trunk/bpel-test/src/test/java/org/apache/ode/test/BPELTest.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-test/src/test/java/org/apache/ode/test/BPELTest.java?view=diff&rev=512456&r1=512455&r2=512456
==============================================================================
--- incubator/ode/trunk/bpel-test/src/test/java/org/apache/ode/test/BPELTest.java (original)
+++ incubator/ode/trunk/bpel-test/src/test/java/org/apache/ode/test/BPELTest.java Tue Feb 27 14:30:33 2007
@@ -21,7 +21,12 @@
 import junit.framework.TestCase;
 import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
 import org.apache.ode.bpel.engine.BpelServerImpl;
-import org.apache.ode.bpel.iapi.*;
+import org.apache.ode.bpel.iapi.ContextException;
+import org.apache.ode.bpel.iapi.Message;
+import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
+import org.apache.ode.bpel.iapi.ProcessStore;
+import org.apache.ode.bpel.iapi.ProcessStoreEvent;
+import org.apache.ode.bpel.iapi.ProcessStoreListener;
 import org.apache.ode.bpel.memdao.BpelDAOConnectionFactoryImpl;
 import org.apache.ode.store.ProcessStoreImpl;
 import org.apache.ode.test.scheduler.TestScheduler;
@@ -57,7 +62,7 @@
 			emf = Persistence.createEntityManagerFactory("ode-unit-test-embedded");
 			em = emf.createEntityManager();
 			String pr = Persistence.PERSISTENCE_PROVIDER;
-            _cf = new org.apache.ode.dao.jpa.ojpa.BPELDAOConnectionFactoryImpl();
+            _cf = new BPELDAOConnectionFactoryImpl();
             server.setDaoConnectionFactory(_cf);
             scheduler = new TestScheduler() {
                 @Override

Modified: incubator/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java?view=diff&rev=512456&r1=512455&r2=512456
==============================================================================
--- incubator/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java (original)
+++ incubator/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java Tue Feb 27 14:30:33 2007
@@ -19,7 +19,11 @@
 
 package org.apache.ode.daohib.bpel;
 
-import org.apache.ode.bpel.dao.*;
+import org.apache.ode.bpel.dao.MessageDAO;
+import org.apache.ode.bpel.dao.MessageExchangeDAO;
+import org.apache.ode.bpel.dao.PartnerLinkDAO;
+import org.apache.ode.bpel.dao.ProcessDAO;
+import org.apache.ode.bpel.dao.ProcessInstanceDAO;
 import org.apache.ode.daohib.SessionManager;
 import org.apache.ode.daohib.bpel.hobj.HLargeData;
 import org.apache.ode.daohib.bpel.hobj.HMessage;
@@ -115,16 +119,10 @@
         update();
     }
 
-    /**
-     * @see org.apache.ode.sfwk.bapi.dao.MessageExchangeDAO#getCorrelationId()
-     */
     public String getCorrelationId() {
         return _hself.getClientKey();
     }
 
-    /**
-     * @see org.apache.ode.sfwk.bapi.dao.MessageExchangeDAO#setCorrelationId(byte[])
-     */
     public void setCorrelationId(String clientKey) {
         _hself.setClientKey(clientKey);
         update();
@@ -300,8 +298,15 @@
         return Collections.unmodifiableSet(_hself.getProperties().keySet());
     }
 
+    public String getPipedMessageExchangeId() {
+        return _hself.getPipedMessageExchangeId();
+    }
+
+    public void setPipedMessageExchangeId(String mexId) {
+        _hself.setPipedMessageExchangeId(mexId);
+    }
+
     public void release() {
         // no-op for now, could be used to do some cleanup
     }
-
 }

Modified: incubator/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HMessageExchange.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HMessageExchange.java?view=diff&rev=512456&r1=512455&r2=512456
==============================================================================
--- incubator/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HMessageExchange.java (original)
+++ incubator/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HMessageExchange.java Tue Feb 27 14:30:33 2007
@@ -70,6 +70,8 @@
 
     private String _callee;
 
+    private String _pipedMessageExchangeId;
+
     private Map<String, String> _properties = new HashMap<String, String>();
 
     /**
@@ -322,4 +324,14 @@
         return _partnerLink;
     }
 
+    /**
+     * @hibernate.property column="PIPED_ID"
+     */
+    public String getPipedMessageExchangeId() {
+        return _pipedMessageExchangeId;
+    }
+
+    public void setPipedMessageExchangeId(String pipedMessageExchangeId) {
+        _pipedMessageExchangeId = pipedMessageExchangeId;
+    }
 }

Modified: incubator/ode/trunk/dao-jpa-ojpa-derby/pom.xml
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/dao-jpa-ojpa-derby/pom.xml?view=diff&rev=512456&r1=512455&r2=512456
==============================================================================
--- incubator/ode/trunk/dao-jpa-ojpa-derby/pom.xml (original)
+++ incubator/ode/trunk/dao-jpa-ojpa-derby/pom.xml Tue Feb 27 14:30:33 2007
@@ -64,13 +64,7 @@
 
         <dependency>
             <groupId>org.apache.ode</groupId>
-            <artifactId>ode-dao-jpa-ojpa</artifactId>
-            <exclusions>
-                <exclusion>
-                    <artifactId>ode-dao-jpa</artifactId>
-                    <groupId>org.apache.ode</groupId>
-                </exclusion>
-            </exclusions>
+            <artifactId>ode-dao-jpa</artifactId>
         </dependency>
         <dependency>
             <groupId>org.apache.ode</groupId>

Modified: incubator/ode/trunk/dao-jpa-ojpa-derby/src/main/descriptors/persistence.derby.xml
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/dao-jpa-ojpa-derby/src/main/descriptors/persistence.derby.xml?view=diff&rev=512456&r1=512455&r2=512456
==============================================================================
--- incubator/ode/trunk/dao-jpa-ojpa-derby/src/main/descriptors/persistence.derby.xml (original)
+++ incubator/ode/trunk/dao-jpa-ojpa-derby/src/main/descriptors/persistence.derby.xml Tue Feb 27 14:30:33 2007
@@ -3,7 +3,6 @@
     <persistence-unit name="ode-unit-test-embedded">
         <provider>org.apache.openjpa.persistence.PersistenceProviderImpl</provider>
         <class>org.apache.ode.dao.jpa.ActivityRecoveryDAOImpl</class>
-        <class>org.apache.ode.dao.jpa.BPELDAOConnectionImpl</class>
         <class>org.apache.ode.dao.jpa.CorrelationSetDAOImpl</class>
         <class>org.apache.ode.dao.jpa.CorrelatorDAOImpl</class>
         <class>org.apache.ode.dao.jpa.EventDAOImpl</class>
@@ -16,6 +15,7 @@
         <class>org.apache.ode.dao.jpa.ProcessInstanceDAOImpl</class>
         <class>org.apache.ode.dao.jpa.ScopeDAOImpl</class>
         <class>org.apache.ode.dao.jpa.XmlDataDAOImpl</class>
+        
         <class>org.apache.ode.store.jpa.ProcessConfDaoImpl</class>
         <class>org.apache.ode.store.jpa.ProcessConfPropertyDaoImpl</class>
         <class>org.apache.ode.store.jpa.DeploymentUnitDaoImpl</class>

Modified: incubator/ode/trunk/dao-jpa-ojpa-derby/src/test/java/org/apache/ode/dao/jpa/test/InsertObjectTest.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/dao-jpa-ojpa-derby/src/test/java/org/apache/ode/dao/jpa/test/InsertObjectTest.java?view=diff&rev=512456&r1=512455&r2=512456
==============================================================================
--- incubator/ode/trunk/dao-jpa-ojpa-derby/src/test/java/org/apache/ode/dao/jpa/test/InsertObjectTest.java (original)
+++ incubator/ode/trunk/dao-jpa-ojpa-derby/src/test/java/org/apache/ode/dao/jpa/test/InsertObjectTest.java Tue Feb 27 14:30:33 2007
@@ -19,18 +19,8 @@
 
 package org.apache.ode.dao.jpa.test;
 
-import java.io.IOException;
-import java.util.Calendar;
-import java.util.Collection;
-import java.util.GregorianCalendar;
-
-import javax.persistence.EntityManager;
-import javax.persistence.EntityManagerFactory;
-import javax.persistence.Persistence;
-import javax.xml.namespace.QName;
-
+import junit.framework.TestCase;
 import org.apache.ode.bpel.common.CorrelationKey;
-import org.apache.ode.bpel.dao.ActivityRecoveryDAO;
 import org.apache.ode.bpel.dao.BpelDAOConnection;
 import org.apache.ode.bpel.dao.CorrelationSetDAO;
 import org.apache.ode.bpel.dao.CorrelatorDAO;
@@ -42,12 +32,16 @@
 import org.apache.ode.bpel.dao.ScopeDAO;
 import org.apache.ode.bpel.dao.ScopeStateEnum;
 import org.apache.ode.bpel.dao.XmlDataDAO;
-import org.apache.ode.dao.jpa.ojpa.*;
 import org.apache.ode.utils.DOMUtils;
-import org.apache.openjpa.persistence.ArgumentException;
 import org.xml.sax.SAXException;
 
-import junit.framework.TestCase;
+import javax.persistence.EntityManager;
+import javax.persistence.EntityManagerFactory;
+import javax.persistence.Persistence;
+import javax.xml.namespace.QName;
+import java.io.IOException;
+import java.util.Calendar;
+import java.util.GregorianCalendar;
 
 public class InsertObjectTest extends TestCase {
 	

Modified: incubator/ode/trunk/dao-jpa-ojpa-derby/src/test/java/org/apache/ode/dao/jpa/test/SelectObjectTest.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/dao-jpa-ojpa-derby/src/test/java/org/apache/ode/dao/jpa/test/SelectObjectTest.java?view=diff&rev=512456&r1=512455&r2=512456
==============================================================================
--- incubator/ode/trunk/dao-jpa-ojpa-derby/src/test/java/org/apache/ode/dao/jpa/test/SelectObjectTest.java (original)
+++ incubator/ode/trunk/dao-jpa-ojpa-derby/src/test/java/org/apache/ode/dao/jpa/test/SelectObjectTest.java Tue Feb 27 14:30:33 2007
@@ -19,16 +19,7 @@
 
 package org.apache.ode.dao.jpa.test;
 
-import java.util.Collection;
-import java.util.Date;
-import java.util.Map;
-
-import javax.persistence.EntityManager;
-import javax.persistence.EntityManagerFactory;
-import javax.persistence.Persistence;
-import javax.persistence.Query;
-import javax.xml.namespace.QName;
-
+import junit.framework.TestCase;
 import org.apache.ode.bpel.common.CorrelationKey;
 import org.apache.ode.bpel.dao.ActivityRecoveryDAO;
 import org.apache.ode.bpel.dao.BpelDAOConnection;
@@ -44,10 +35,12 @@
 import org.apache.ode.bpel.dao.ScopeDAO;
 import org.apache.ode.bpel.dao.ScopeStateEnum;
 import org.apache.ode.bpel.dao.XmlDataDAO;
-import org.apache.ode.dao.jpa.ojpa.BPELDAOConnectionFactoryImpl;
-import org.apache.ode.utils.DOMUtils;
 
-import junit.framework.TestCase;
+import javax.persistence.EntityManager;
+import javax.persistence.EntityManagerFactory;
+import javax.persistence.Persistence;
+import javax.xml.namespace.QName;
+import java.util.Collection;
 
 public class SelectObjectTest extends TestCase {
 	

Modified: incubator/ode/trunk/dao-jpa-ojpa-derby/src/test/resources/META-INF/persistence.xml
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/dao-jpa-ojpa-derby/src/test/resources/META-INF/persistence.xml?view=diff&rev=512456&r1=512455&r2=512456
==============================================================================
--- incubator/ode/trunk/dao-jpa-ojpa-derby/src/test/resources/META-INF/persistence.xml (original)
+++ incubator/ode/trunk/dao-jpa-ojpa-derby/src/test/resources/META-INF/persistence.xml Tue Feb 27 14:30:33 2007
@@ -3,7 +3,6 @@
     <persistence-unit name="ode-unit-test-embedded">
         <provider>org.apache.openjpa.persistence.PersistenceProviderImpl</provider>
         <class>org.apache.ode.dao.jpa.ActivityRecoveryDAOImpl</class>
-        <class>org.apache.ode.dao.jpa.BPELDAOConnectionImpl</class>
         <class>org.apache.ode.dao.jpa.CorrelationSetDAOImpl</class>
         <class>org.apache.ode.dao.jpa.CorrelatorDAOImpl</class>
         <class>org.apache.ode.dao.jpa.EventDAOImpl</class>

Modified: incubator/ode/trunk/dao-jpa-ojpa/pom.xml
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/dao-jpa-ojpa/pom.xml?view=diff&rev=512456&r1=512455&r2=512456
==============================================================================
--- incubator/ode/trunk/dao-jpa-ojpa/pom.xml (original)
+++ incubator/ode/trunk/dao-jpa-ojpa/pom.xml Tue Feb 27 14:30:33 2007
@@ -32,14 +32,6 @@
 
 
     <dependencies>
-        <!--
-            <dependency>
-              <groupId>junit</groupId>
-              <artifactId>junit</artifactId>
-              <version>3.8.1</version>
-              <scope>test</scope>
-            </dependency>
-        -->
         <dependency>
             <groupId>org.apache.openjpa</groupId>
             <artifactId>openjpa-all</artifactId>

Modified: incubator/ode/trunk/dao-jpa/pom.xml
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/dao-jpa/pom.xml?view=diff&rev=512456&r1=512455&r2=512456
==============================================================================
--- incubator/ode/trunk/dao-jpa/pom.xml (original)
+++ incubator/ode/trunk/dao-jpa/pom.xml Tue Feb 27 14:30:33 2007
@@ -44,6 +44,62 @@
             <groupId>org.apache.openjpa</groupId>
             <artifactId>openjpa-all</artifactId>
         </dependency>
+        <dependency>
+            <groupId>commons-collections</groupId>
+            <artifactId>commons-collections</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.ode</groupId>
+            <artifactId>ode-bpel-dao</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>xerces</groupId>
+            <artifactId>xercesImpl</artifactId>
+            <scope>compile</scope>
+        </dependency>
     </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-antrun-plugin</artifactId>
+                <version>1.0</version>
+
+                <executions>
+                    <execution>
+                        <phase>compile</phase>
+                        <id>OpenJPA Enhancer</id>
+                        <configuration>
+                            <tasks>
+                                <java classname="org.apache.openjpa.enhance.PCEnhancer">
+                                    <arg line="-p ${basedir}/src/main/resources/META-INF/persistence.xml"/>
+                                    <arg line="-d ${basedir}/target/classes"/>
+                                    <classpath>
+                                        <path refid="maven.dependency.classpath"/>
+                                    </classpath>
+                                </java>
+                            </tasks>
+                        </configuration>
+                        <goals>
+                            <goal>run</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <phase>validate</phase>
+                        <id>Cleanup</id>
+                        <configuration>
+                            <tasks>
+                                <delete><fileset dir="${basedir}" includes="target/classes/**/*.class"/></delete>
+                            </tasks>
+                        </configuration>
+                        <goals>
+                            <goal>run</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
 
 </project>

Added: incubator/ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/BPELDAOConnectionFactoryImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/BPELDAOConnectionFactoryImpl.java?view=auto&rev=512456
==============================================================================
--- incubator/ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/BPELDAOConnectionFactoryImpl.java (added)
+++ incubator/ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/BPELDAOConnectionFactoryImpl.java Tue Feb 27 14:30:33 2007
@@ -0,0 +1,94 @@
+package org.apache.ode.dao.jpa;
+
+import org.apache.ode.bpel.dao.BpelDAOConnection;
+import org.apache.ode.bpel.dao.BpelDAOConnectionFactoryJDBC;
+import org.apache.openjpa.ee.ManagedRuntime;
+
+import javax.persistence.EntityManagerFactory;
+import javax.persistence.Persistence;
+import javax.sql.DataSource;
+import javax.transaction.TransactionManager;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+public class BPELDAOConnectionFactoryImpl implements BpelDAOConnectionFactoryJDBC {
+
+    private EntityManagerFactory _emf;
+
+    private TransactionManager _tm;
+
+    private DataSource _ds;
+
+    private Object _dbdictionary;
+
+    private DataSource _unmanagedDS;
+
+    public BPELDAOConnectionFactoryImpl() {
+    }
+
+    public BpelDAOConnection getConnection() {
+        return new BPELDAOConnectionImpl(_emf.createEntityManager());
+    }
+
+    public void init(Properties properties) {
+        HashMap<String, Object> propMap = new HashMap<String,Object>();
+
+        propMap.put("javax.persistence.nonJtaDataSource", _unmanagedDS == null ? _ds : _unmanagedDS);
+        propMap.put("openjpa.Log", "DefaultLevel=TRACE");
+//        propMap.put("openjpa.Log", "log4j");
+        propMap.put("openjpa.jdbc.DBDictionary", "org.apache.openjpa.jdbc.sql.DerbyDictionary");
+
+//        propMap.put("openjpa.ManagedRuntime", new TxMgrProvider());
+//        propMap.put("openjpa.ConnectionDriverName", org.apache.derby.jdbc.EmbeddedDriver.class.getName());
+//        propMap.put("javax.persistence.nonJtaDataSource", _unmanagedDS == null ? _ds : _unmanagedDS);
+//        propMap.put("javax.persistence.DataSource", _ds);
+//        propMap.put("openjpa.Log", "DefaultLevel=TRACE");
+//        propMap.put("openjpa.jdbc.DBDictionary", "org.apache.openjpa.jdbc.sql.DerbyDictionary");
+        if (_dbdictionary != null)
+            propMap.put("openjpa.jdbc.DBDictionary", _dbdictionary);
+
+        if (properties != null)
+            for (Map.Entry me : properties.entrySet())
+                propMap.put((String)me.getKey(),me.getValue());
+
+        _emf = Persistence.createEntityManagerFactory("ode-dao", propMap);
+    }
+
+    public void setTransactionManager(TransactionManager tm) {
+        _tm = tm;
+    }
+
+    public void setDataSource(DataSource datasource) {
+        _ds = datasource;
+
+    }
+
+    public void setDBDictionary(String dbd) {
+        _dbdictionary = dbd;
+    }
+
+    public void setTransactionManager(Object tm) {
+        _tm = (TransactionManager) tm;
+
+    }
+
+    public void setUnmanagedDataSource(DataSource ds) {
+        _unmanagedDS = ds;
+    }
+
+    public void shutdown() {
+        _emf.close();
+    }
+
+
+    private class TxMgrProvider implements ManagedRuntime {
+        public TxMgrProvider() {
+        }
+
+        public TransactionManager getTransactionManager() throws Exception {
+            return _tm;
+        }
+    }
+
+}