You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by ms...@apache.org on 2007/08/01 19:07:41 UTC

svn commit: r561873 - in /ode/branches/bart/bpel-runtime/src: main/java/org/apache/ode/bpel/engine/ main/java/org/apache/ode/bpel/memdao/ test/java/org/apache/ode/bpel/runtime/

Author: mszefler
Date: Wed Aug  1 10:07:35 2007
New Revision: 561873

URL: http://svn.apache.org/viewvc?view=rev&rev=561873
Log:
change meximpl to allow defered creation of DAO (until invokeXXX is 
called). this saves a transaction on ASYNC/BLOCKING.

Added:
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java   (with props)
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorkerCache.java   (with props)
Removed:
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
Modified:
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncPartnerRoleMessageExchangeImpl.java
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingPartnerRoleMessageExchangeImpl.java
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelDatabase.java
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEventListener.java
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelManagementFacadeImpl.java
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Messages.java
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/NStateLatch.java
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java
    ode/branches/bart/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java

Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java?view=diff&rev=561873&r1=561872&r2=561873
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java Wed Aug  1 10:07:35 2007
@@ -1,19 +1,24 @@
 package org.apache.ode.bpel.engine;
 
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import javax.wsdl.Operation;
+import javax.xml.namespace.QName;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ode.bpel.dao.MessageExchangeDAO;
 import org.apache.ode.bpel.iapi.InvocationStyle;
+import org.apache.ode.bpel.o.OPartnerLink;
 
 /**
  * For invoking the engine using ASYNC style.
  * 
- * @author Maciej Szefler
+ * @author Maciej Szefler <mszefler at gmail dot com>
  * 
  */
 public class AsyncMyRoleMessageExchangeImpl extends MyRoleMessageExchangeImpl {
@@ -21,8 +26,25 @@
 
     ResponseFuture _future;
     
-    public AsyncMyRoleMessageExchangeImpl(BpelProcess process, String mexId) {
-        super(process, mexId);
+    public AsyncMyRoleMessageExchangeImpl(BpelProcess process, String mexId, OPartnerLink oplink, Operation operation, QName callee) {
+        super(process, mexId, oplink, operation, callee);
+    }
+
+    /**
+     * Override the setStatus(...) to notify our future when there is a response/failure.
+     */
+    protected void setStatus(Status status) {
+        Status old = getStatus();
+        super.setStatus(status);
+        if (_future != null) {
+            if (getMessageExchangePattern() == MessageExchangePattern.REQUEST_ONLY) {
+                if (old == Status.REQUEST && old != status)
+                    _future.done(status);
+            } else /* two-way */ {
+                if ((old == Status.ASYNC || old == Status.REQUEST) && status != Status.ASYNC)
+                    _future.done(status);
+            }
+        }
     }
 
     public Future<Status> invokeAsync() {
@@ -30,31 +52,25 @@
             return _future;
         
         _future = new ResponseFuture();
+        _process.enqueueTransaction(new Callable<Void>() {
 
-        if (_process.isInMemory()) {
-            _process.invokeProcess(_process.getInMemMexDAO(_mexId));
-        } else {
-            doInTX(new InDbAction<Void>() {
-                public Void call(MessageExchangeDAO mexdao) {
-                    scheduleInvoke(_process);
-                    return null;
-                }
-            });
-        }
+            public Void call() throws Exception {
+                MessageExchangeDAO dao = _process.createMessageExchange(getMessageExchangeId(), MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
+                save(dao);
+                if (_process.isInMemory()) 
+                    _process.invokeProcess(dao);
+                else
+                    scheduleInvoke();
+                return null;
+            }
+            
+        });
       
-        if (getOperation().getOutput() == null) {
-            _future.done(getStatus());
-        }
-
+        
         return _future;
 
     }
 
-    protected void onMessageExchangeComplete(MessageExchangeDAO mexdao) {
-        load(mexdao);
-        _future.done(getStatus());         
-    }
-    
     private static class ResponseFuture implements Future<Status> {
         private Status _status;
 

Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncPartnerRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncPartnerRoleMessageExchangeImpl.java?view=diff&rev=561873&r1=561872&r2=561873
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncPartnerRoleMessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncPartnerRoleMessageExchangeImpl.java Wed Aug  1 10:07:35 2007
@@ -1,7 +1,6 @@
 package org.apache.ode.bpel.engine;
 
 import javax.wsdl.Operation;
-import javax.wsdl.PortType;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -11,6 +10,7 @@
 import org.apache.ode.bpel.iapi.InvocationStyle;
 import org.apache.ode.bpel.iapi.PartnerRoleChannel;
 import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
+import org.apache.ode.bpel.o.OPartnerLink;
 
 /**
  * Implementation of the {@link PartnerRoleMessageExchange} interface that is used when the ASYNC invocation style is being used
@@ -18,16 +18,16 @@
  * object) until the ODE transaction has committed, and it does not block during the performance of the operation. Hence, when a
  * reply becomes available, we'll need to schedule a transaction to process it.
  * 
- * @author Maciej Szefler
+ * @author Maciej Szefler <mszefler at gmail dot com>
  * 
  */
 public class AsyncPartnerRoleMessageExchangeImpl extends PartnerRoleMessageExchangeImpl {
 
     private static final Log __log = LogFactory.getLog(AsyncPartnerRoleMessageExchangeImpl.class);
     
-    AsyncPartnerRoleMessageExchangeImpl(BpelProcess process, String mexId, PortType portType, Operation operation,
+    AsyncPartnerRoleMessageExchangeImpl(BpelProcess process, String mexId, OPartnerLink oplink, Operation operation,
             EndpointReference epr, EndpointReference myRoleEPR, PartnerRoleChannel channel) {
-        super(process, mexId, portType, operation, epr, myRoleEPR, channel);
+        super(process, mexId, oplink, operation, epr, myRoleEPR, channel);
     }
 
     @Override

Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java?view=diff&rev=561873&r1=561872&r2=561873
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java Wed Aug  1 10:07:35 2007
@@ -5,20 +5,24 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import javax.wsdl.Operation;
+import javax.xml.namespace.QName;
+
 import org.apache.ode.bpel.iapi.BpelEngineException;
 import org.apache.ode.bpel.iapi.InvocationStyle;
+import org.apache.ode.bpel.o.OPartnerLink;
 
 /**
  * Non-transaction blocking MyRole message-exchange implementation.
  * 
- * @author Maciej Szefler 
+ * @author Maciej Szefler <mszefler at gmail dot com>
  */
 public class BlockingMyRoleMessageExchangeImpl extends AsyncMyRoleMessageExchangeImpl {
     Future<Status> _future;
     boolean _done = false;
     
-    public BlockingMyRoleMessageExchangeImpl(BpelProcess process, String mexId) {
-        super(process, mexId);
+    public BlockingMyRoleMessageExchangeImpl(BpelProcess process, String mexId, OPartnerLink oplink, Operation operation, QName callee) {
+        super(process, mexId, oplink, operation, callee);
     }
 
     @Override
@@ -29,14 +33,14 @@
     @Override
     public Status invokeBlocking() throws BpelEngineException, TimeoutException {
         if (_done) 
-            return _status;
+            return getStatus();
 
         Future<Status> future = _future != null ? _future : super.invokeAsync();
         
         try {
-            _status = future.get(Math.max(_timeout,1), TimeUnit.MILLISECONDS);
+            future.get(Math.max(_timeout,1), TimeUnit.MILLISECONDS);
             _done = true;
-            return _status;
+            return getStatus();
         } catch (InterruptedException e) {
             throw new BpelEngineException(e);
         } catch (ExecutionException e) {

Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingPartnerRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingPartnerRoleMessageExchangeImpl.java?view=diff&rev=561873&r1=561872&r2=561873
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingPartnerRoleMessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingPartnerRoleMessageExchangeImpl.java Wed Aug  1 10:07:35 2007
@@ -1,7 +1,6 @@
 package org.apache.ode.bpel.engine;
 
 import javax.wsdl.Operation;
-import javax.wsdl.PortType;
 
 import org.apache.ode.bpel.iapi.BpelEngineException;
 import org.apache.ode.bpel.iapi.EndpointReference;
@@ -9,6 +8,7 @@
 import org.apache.ode.bpel.iapi.MessageExchangeContext;
 import org.apache.ode.bpel.iapi.PartnerRoleChannel;
 import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
+import org.apache.ode.bpel.o.OPartnerLink;
 
 /**
  * Implementation of the {@link PartnerRoleMessageExchange} interface that is passed to the IL when the 
@@ -18,13 +18,13 @@
  *
  * This InvocationStyle makes this class rather trivial. 
  *  
- * @author Maciej Szefler
+ * @author Maciej Szefler <mszefler at gmail dot com>
  *
  */
 public class BlockingPartnerRoleMessageExchangeImpl extends PartnerRoleMessageExchangeImpl {
 
-    BlockingPartnerRoleMessageExchangeImpl(BpelProcess process, String mexId, PortType portType, Operation operation, EndpointReference epr, EndpointReference myRoleEPR, PartnerRoleChannel channel) {
-        super(process, mexId, portType, operation, epr, myRoleEPR, channel);
+    BlockingPartnerRoleMessageExchangeImpl(BpelProcess process, String mexId, OPartnerLink oplink, Operation operation, EndpointReference epr, EndpointReference myRoleEPR, PartnerRoleChannel channel) {
+        super(process, mexId, oplink, operation, epr, myRoleEPR, channel);
     }
 
     /**

Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelDatabase.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelDatabase.java?view=diff&rev=561873&r1=561872&r2=561873
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelDatabase.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelDatabase.java Wed Aug  1 10:07:35 2007
@@ -28,7 +28,12 @@
 import org.apache.commons.logging.LogFactory;
 
 /**
- * Encapsulates transactional access to the BPEL database.
+ * Mostly of historical interest. Provides transactional access to the BPEL database, defines a Callable-style
+ * interface for transactions. 
+ * 
+ * <p>Should probably be eliminated. --mszefler 2007-07-26 </p>
+ * 
+ * @author Maciej Szefler <mszefler at gmail dot com>
  */
 class BpelDatabase {
     static Log __log = LogFactory.getLog(BpelDatabase.class);

Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEventListener.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEventListener.java?view=diff&rev=561873&r1=561872&r2=561873
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEventListener.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEventListener.java Wed Aug  1 10:07:35 2007
@@ -22,6 +22,8 @@
 
 /**
  * Interface implemented by listeners of BPEL events.
+ * 
+ * @author Maciej Szefler <mszefler at gmail dot com>
  */
 public interface BpelEventListener {
 

Added: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java?view=auto&rev=561873
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java (added)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java Wed Aug  1 10:07:35 2007
@@ -0,0 +1,151 @@
+package org.apache.ode.bpel.engine;
+
+import java.util.ArrayList;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Semaphore;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.dao.MessageExchangeDAO;
+import org.apache.ode.bpel.dao.ProcessInstanceDAO;
+import org.apache.ode.bpel.iapi.BpelEngineException;
+import org.apache.ode.bpel.runtime.PROCESS;
+
+/**
+ * Objects used for synchronizing the execution of instance-level work. All work on behalf of an instance is funneled to one of
+ * these objects. For all practical purposes these are singletons, with the caveat that they expire as soon as all work for an
+ * instance is complete so they may be recreated on-demand. The effect is that all work for an instance occurs in a single thread.
+ * 
+ * @author Maciej Szefler <mszefler at gmail dot com>
+ * 
+ */
+class BpelInstanceWorker implements Runnable {
+
+    private static final Log __log = LogFactory.getLog(BpelInstanceWorker.class);
+
+    final BpelProcess _process;
+
+    final Long _iid;
+
+    final Contexts _contexts;
+
+    private boolean _running = false;
+
+    private ArrayList<Runnable> _todoQueue = new ArrayList<Runnable>();
+
+    private final ThreadLocal<Long> _activeInstance = new ThreadLocal<Long>();
+
+    BpelInstanceWorker(BpelProcess process, Long iid) {
+        _process = process;
+        _iid = iid;
+        _contexts = _process._contexts;
+    }
+
+    Long getIID() {
+        return _iid;
+    }
+
+    /**
+     * Add a task for this instance.
+     * 
+     * @param runnable
+     */
+    synchronized void enqueue(Runnable runnable) {
+        _todoQueue.add(runnable);
+        // We mayh need to reschedule this thread if we've dropped out of the end of the run() method.
+        if (!_running) {
+            _running = true;
+            _process.scheduleRunnable(this);
+        }
+    }
+ 
+    /**
+     * Execute some work on behalf of the instance, but don't do it in the worker thread, instead do it in the calling thread. Why
+     * bother? Well, sometimes we need to do some work in the current thread because it is associated with some transaction we'd
+     * like to use, but we don't want to go through the suspend/resume BS. Ok, so why not just do the work directly? Well we want to
+     * do the work as if it was occuring in our worker thread, so we have to block it for the duration of the action.
+     * 
+     * 
+     * @param <T>
+     *            parameterization of {@link Callable}
+     * @param callable
+     *            the thing to call
+     * @return return value of the callble
+     * @throws Exception
+     *             forwarded from {@link Callable#call()}
+     */
+    synchronized <T> T execInCurrentThread(Callable<T> callable) throws Exception {
+        final Semaphore ready = new Semaphore(0);
+        final Semaphore finished = new Semaphore(0);
+        enqueue(new Runnable() {
+            public void run() {
+                ready.release();
+                try {
+                    finished.acquire();
+                } catch (InterruptedException ie) {
+                    __log.error("Thread interrupted.", ie);
+                    throw new BpelEngineException("Thread interrupted.", ie);
+                }
+            }
+        });
+        try {
+            ready.acquire();
+        } catch (InterruptedException ex) {
+            __log.error("Thread interrupted.", ex);
+            throw new BpelEngineException("Thread interrupted.", ex);
+        }
+
+        _activeInstance.set(_iid);
+        try {
+            return callable.call();
+        } catch (Exception ex) {
+            throw ex;
+        } finally {
+            finished.release();
+            _activeInstance.set(null);
+        }
+
+    }
+
+   
+
+    /**
+     * Implementation of the {@link Runnable} interface.
+     */
+    public void run() {
+        _activeInstance.set(_iid);
+        try {
+
+            do {
+                Runnable next;
+                synchronized (this) {
+                    if (_todoQueue.isEmpty()) {
+                        // This is the only way to drop out of this method short of some disasterous error. This is
+                        // important since we need to synchronize _running with _todoQueue state.
+                        _running = false;
+                        return;
+                    }
+
+                    next = _todoQueue.remove(0);
+                }
+
+                try {
+                    next.run();
+                } catch (Throwable t) {
+                    __log.error("Unexpected error in instance thread.", t);
+                }
+            } while (true);
+        } finally {
+            _activeInstance.set(null);
+        }
+    }
+
+    public String toString() {
+        return "{BpelInstanceWorker: PID=" + _process.getPID() + " IID=" + _iid + "}";
+    }
+
+    public boolean isWorkerThread() {
+        return _activeInstance.get() != null;
+    }
+
+}

Propchange: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorkerCache.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorkerCache.java?view=auto&rev=561873
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorkerCache.java (added)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorkerCache.java Wed Aug  1 10:07:35 2007
@@ -0,0 +1,56 @@
+package org.apache.ode.bpel.engine;
+
+import java.lang.ref.Reference;
+import java.lang.ref.ReferenceQueue;
+import java.lang.ref.WeakReference;
+import java.util.HashMap;
+import java.util.WeakHashMap;
+
+/**
+ * A cache of {@link BpelInstanceWorker} objects. 
+ * 
+ * @author Maciej Szefler <mszefler at gmail dot com>
+ *
+ */
+class BpelInstanceWorkerCache {
+    private HashMap<Long, WeakReference<BpelInstanceWorker>> _cache = new HashMap<Long, WeakReference<BpelInstanceWorker>>();
+    private ReferenceQueue<BpelInstanceWorker> _refQ = new ReferenceQueue<BpelInstanceWorker>();
+    
+    private BpelProcess _process;
+    
+    public BpelInstanceWorkerCache(BpelProcess process) {
+        _process = process;
+    }
+    
+    synchronized BpelInstanceWorker get(long iid) {
+        expungeStaleEntries();
+        WeakReference<BpelInstanceWorker> wref = _cache.get(iid);
+        BpelInstanceWorker worker;
+        
+        // Case: not in cache.
+        if (wref == null) {
+            worker = new BpelInstanceWorker(_process, iid);
+            wref = new WeakReference<BpelInstanceWorker>(worker,_refQ);
+            _cache.put(iid, wref);
+        } else {
+            worker = wref.get();
+            
+            // Case: garbage collected
+            if (worker == null) {
+                worker = new BpelInstanceWorker(_process, iid);
+                wref = new WeakReference<BpelInstanceWorker>(worker,_refQ);
+                _cache.put(iid, wref); 
+            }
+        }
+
+        return worker;
+    }
+    
+    
+    private void expungeStaleEntries() {
+        Reference<? extends BpelInstanceWorker> x;
+        while ((x=_refQ.poll()) != null){
+            _cache.values().remove(x);
+        }
+    }
+}

Propchange: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorkerCache.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelManagementFacadeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelManagementFacadeImpl.java?view=diff&rev=561873&r1=561872&r2=561873
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelManagementFacadeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelManagementFacadeImpl.java Wed Aug  1 10:07:35 2007
@@ -40,6 +40,8 @@
  * Implementation of the instance/process management interaction. This class implements
  * the methods necessary to support process debugging. It also implements all the methods in the
  * newer Process/Instance Management interface (pmapi).
+ * 
+ * @author Maciej Szefler <mszefler at gmail dot com>
  */
 public class BpelManagementFacadeImpl extends ProcessAndInstanceManagementImpl
         implements BpelManagementFacade {

Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?view=diff&rev=561873&r1=561872&r2=561873
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java Wed Aug  1 10:07:35 2007
@@ -19,6 +19,7 @@
 package org.apache.ode.bpel.engine;
 
 import java.io.InputStream;
+import java.lang.ref.WeakReference;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -28,6 +29,7 @@
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Future;
 
 import javax.wsdl.Operation;
@@ -72,6 +74,7 @@
 import org.apache.ode.bpel.runtime.PropertyAliasEvaluationContext;
 import org.apache.ode.bpel.runtime.channels.FaultData;
 import org.apache.ode.jacob.soup.ReplacementMap;
+import org.apache.ode.utils.GUID;
 import org.apache.ode.utils.ObjectPrinter;
 import org.apache.ode.utils.msg.MessageBundle;
 import org.w3c.dom.Element;
@@ -82,10 +85,10 @@
 /**
  * Entry point into the runtime of a BPEL process.
  * 
- * @author Maciej Szefler
+ * @author Maciej Szefler <mszefler at gmail dot com>
  * @author Matthieu Riou <mriou at apache dot org>
  */
-public class BpelProcess {
+class BpelProcess {
     static final Log __log = LogFactory.getLog(BpelProcess.class);
 
     private static final Messages __msgs = MessageBundle.getMessages(Messages.class);
@@ -140,15 +143,9 @@
 
     final BpelServerImpl _server;
 
-    /** Indicates whether we are operating in a server-managed transaction. */
-    private ThreadLocal<Boolean> _serverTx = new ThreadLocal<Boolean>() {
-        @Override
-        protected Boolean initialValue() {
-            return Boolean.FALSE;
-        }
-    };
+    final private List<WeakReference<MyRoleMessageExchangeImpl>> _mexStateListeners = new CopyOnWriteArrayList<WeakReference<MyRoleMessageExchangeImpl>>();
 
-    public BpelProcess(BpelServerImpl server, ProcessConf conf, BpelEventListener debugger) {
+    BpelProcess(BpelServerImpl server, ProcessConf conf, BpelEventListener debugger) {
         _server = server;
         _pid = conf.getProcessId();
         _pconf = conf;
@@ -171,7 +168,7 @@
         return "BpelProcess[" + _pid + "]";
     }
 
-    public void recoverActivity(ProcessInstanceDAO instanceDAO, String channel, long activityId, String action, FaultData fault) {
+    void recoverActivity(ProcessInstanceDAO instanceDAO, String channel, long activityId, String action, FaultData fault) {
         if (__log.isDebugEnabled())
             __log.debug("Recovering activity in process " + instanceDAO.getInstanceId() + " with action " + action);
         markused();
@@ -180,13 +177,6 @@
         // processInstance.recoverActivity(channel, activityId, action, fault);
     }
 
-    static String generateMessageExchangeIdentifier(String partnerlinkName, String operationName) {
-        StringBuffer sb = new StringBuffer(partnerlinkName);
-        sb.append('.');
-        sb.append(operationName);
-        return sb.toString();
-    }
-
     /**
      * Entry point for message exchanges aimed at the my role.
      * 
@@ -204,6 +194,7 @@
                 mexdao.setFailureType(MessageExchange.FailureType.UNKNOWN_ENDPOINT.toString());
                 mexdao.setFaultExplanation(errmsg);
                 mexdao.setStatus(Status.FAILURE.toString());
+                fireMexStateEvent(mexdao);
                 return;
             }
 
@@ -249,7 +240,9 @@
         // }
     }
 
-    void executeCreateInstance(MessageExchangeDAO mexdao) {
+    private void executeCreateInstance(MessageExchangeDAO mexdao) {
+        assert _hydrationLatch.isLatched(1);
+        
         BpelInstanceWorker worker = _instanceWorkerCache.get(mexdao.getInstance().getInstanceId());
         assert worker.isWorkerThread();
         BpelRuntimeContextImpl instanceCtx = new BpelRuntimeContextImpl(worker, mexdao.getInstance(), new PROCESS(_oprocess),
@@ -257,7 +250,9 @@
         instanceCtx.execute();
     }
 
-    void executeContinueInstance(MessageExchangeDAO mexdao) {
+    private void executeContinueInstance(MessageExchangeDAO mexdao) {
+        assert _hydrationLatch.isLatched(1);
+
         BpelInstanceWorker worker = _instanceWorkerCache.get(mexdao.getInstance().getInstanceId());
         assert worker.isWorkerThread();
         BpelRuntimeContextImpl instance = new BpelRuntimeContextImpl(worker, mexdao.getInstance(), null, null);
@@ -305,7 +300,9 @@
     }
 
     private PartnerLinkMyRoleImpl getMyRoleForService(QName serviceName) {
-        for (Map.Entry<Endpoint, PartnerLinkMyRoleImpl> e : getEndpointToMyRoleMap().entrySet()) {
+        assert _hydrationLatch.isLatched(1);
+        
+        for (Map.Entry<Endpoint, PartnerLinkMyRoleImpl> e : _endpointToMyRoleMap.entrySet()) {
             if (e.getKey().serviceName.equals(serviceName))
                 return e.getValue();
         }
@@ -362,18 +359,7 @@
             return null;
     }
 
-    /**
-     * Get the element name for a given WSDL part. If the part is an <em>element</em> part, the name of that element is returned.
-     * If the part is an XML schema typed part, then the name of the part is returned in the null namespace.
-     * 
-     * @param part
-     *            WSDL {@link javax.wsdl.Part}
-     * @return name of element containing said part
-     */
-    static QName getElementNameForPart(OMessageVarType.Part part) {
-        return (part.type instanceof OElementVarType) ? ((OElementVarType) part.type).elementType : new QName(null, part.name);
-    }
-
+    
     /**
      * Process the message-exchange interceptors.
      * 
@@ -387,7 +373,7 @@
         // for (MessageExchangeInterceptor i : _mexInterceptors)
         // if (!mex.processInterceptor(i, mex, ictx, invoker))
         // return false;
-        // for (MessageExchangeInterceptor i : getEngine().getGlobalInterceptors())
+        // for (MessageExchangeInterceptor i : getEngine().getInterceptors())
         // if (!mex.processInterceptor(i, mex, ictx, invoker))
         // return false;
         //
@@ -401,7 +387,7 @@
      * @throws JobProcessorException
      * @see org.apache.ode.bpel.engine.BpelProcess#handleWorkEvent(java.util.Map<java.lang.String,java.lang.Object>)
      */
-    public void handleWorkEvent(final JobInfo jobInfo) throws JobProcessorException {
+    void handleWorkEvent(final JobInfo jobInfo) throws JobProcessorException {
         assert !_contexts.isTransacted() : "work events must be received outside of a transaction";
 
         markused();
@@ -445,7 +431,7 @@
      * @param tx
      *            the transaction
      */
-    private <T> Future<T> enqueueTransaction(final Callable<T> tx) {
+    <T> Future<T> enqueueTransaction(final Callable<T> tx) {
         // We have to wrap our transaction to make sure that we are hydrated when the transaction runs.
         return _server.enqueueTransaction(new ProcessCallable<T>(tx));
     }
@@ -711,34 +697,63 @@
         }
     }
 
-    MyRoleMessageExchangeImpl createMyRoleMex(MessageExchangeDAO mexdao) {
+    private MyRoleMessageExchangeImpl newMyRoleMex(
+            InvocationStyle istyle, 
+            String mexId, 
+            QName target, 
+            OPartnerLink oplink, 
+            Operation operation) {
+        MyRoleMessageExchangeImpl mex;
+        switch (istyle) {
+        case RELIABLE:
+            mex = new ReliableMyRoleMessageExchangeImpl(this, mexId, oplink,operation, target);
+            break;
+        case ASYNC:
+            mex = new AsyncMyRoleMessageExchangeImpl(this, mexId,oplink,operation,target);
+            break;
+        case TRANSACTED:
+            mex = new TransactedMyRoleMessageExchangeImpl(this, mexId, oplink,operation,target);
+            break;
+        case BLOCKING:
+            mex = new BlockingMyRoleMessageExchangeImpl(this, mexId, oplink,operation,target);
+            break;
+        default:
+            throw new AssertionError("Unexpected invocation style: " + istyle);
+
+        }
+        
+        registerMyRoleMex(mex);
+        return mex;
+    }
+    
+    MyRoleMessageExchangeImpl recreateMyRoleMex(MessageExchangeDAO mexdao) {
         InvocationStyle istyle = InvocationStyle.valueOf(mexdao.getInvocationStyle());
 
         _hydrationLatch.latch(1);
         try {
-            MyRoleMessageExchangeImpl mex;
-            switch (istyle) {
-            case RELIABLE:
-                mex = new ReliableMyRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId());
-                break;
-            case ASYNC:
-                mex = new AsyncMyRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId());
-                break;
-            case TRANSACTED:
-                mex = new TransactedMyRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId());
-                break;
-            case BLOCKING:
-                mex = new BlockingMyRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId());
-                break;
-            default:
-                throw new AssertionError("Unexpected invocation style: " + istyle);
-
-            }
             OPartnerLink plink = (OPartnerLink) _oprocess.getChild(mexdao.getPartnerLinkModelId());
-            PortType ptype = plink.myRolePortType;
-            Operation op = plink.getMyRoleOperation(mexdao.getOperation());
+            if (plink == null) {
+                String errmsg = __msgs.msgDbConsistencyError("MexDao #"+ mexdao.getMessageExchangeId() + " referenced unknown pLinkModelId " + mexdao.getPartnerLinkModelId());
+                __log.error(errmsg);
+                throw new BpelEngineException(errmsg);
+            }
+            
+            Operation op  = plink.getMyRoleOperation(mexdao.getOperation());
+            if (op == null) {
+                String errmsg = __msgs.msgDbConsistencyError("MexDao #"+ mexdao.getMessageExchangeId() + " referenced unknown operation " + mexdao.getOperation());
+                __log.error(errmsg);
+                throw new BpelEngineException(errmsg);                
+            }
+            
+            PartnerLinkMyRoleImpl myRole = _myRoles.get(plink);
+            if (myRole == null) {
+                String errmsg = __msgs.msgDbConsistencyError("MexDao #"+ mexdao.getMessageExchangeId() + " referenced non-existant myrole");
+                __log.error(errmsg);
+                throw new BpelEngineException(errmsg);                                
+            }
+            
+            MyRoleMessageExchangeImpl mex = newMyRoleMex(istyle, mexdao.getMessageExchangeId(), myRole._endpoint.serviceName, plink, op);
             mex.load(mexdao);
-            mex.init(ptype, op, MessageExchangePattern.valueOf(mexdao.getPattern()));
             return mex;
         } finally {
             _hydrationLatch.release(1);
@@ -751,27 +766,26 @@
         _hydrationLatch.latch(1);
         try {
             OPartnerLink plink = (OPartnerLink) _oprocess.getChild(mexdao.getPartnerLinkModelId());
-            PortType ptype = plink.partnerRolePortType;
             Operation op = plink.getPartnerRoleOperation(mexdao.getOperation());
             switch (istyle) {
             case BLOCKING:
-                mex = new BlockingPartnerRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId(), ptype, op, null, /* EPR todo */
+                mex = new BlockingPartnerRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId(), plink, op, null, /* EPR todo */
                 plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
                 break;
             case ASYNC:
-                mex = new AsyncPartnerRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId(), ptype, op, null, /* EPR todo */
+                mex = new AsyncPartnerRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId(), plink, op, null, /* EPR todo */
                 plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
                 break;
 
             case TRANSACTED:
-                mex = new TransactedPartnerRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId(), ptype, op, null, /*
+                mex = new TransactedPartnerRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId(), plink, op, null, /*
                                                                                                                              * EPR
                                                                                                                              * todo
                                                                                                                              */
                 plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
                 break;
             case RELIABLE:
-                mex = new ReliablePartnerRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId(), ptype, op, null, /* EPR todo */
+                mex = new ReliablePartnerRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId(), plink, op, null, /* EPR todo */
                 plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
                 break;
 
@@ -779,7 +793,7 @@
                 throw new BpelEngineException("Unexpected InvocationStyle: " + istyle);
 
             }
-            
+
             mex.load(mexdao);
             return mex;
         } finally {
@@ -792,22 +806,33 @@
         return _invocationStyles;
     }
 
-    private Map<Endpoint, PartnerLinkMyRoleImpl> getEndpointToMyRoleMap() {
-        _hydrationLatch.latch(1);
-        try {
-            return _endpointToMyRoleMap;
-        } finally {
-            _hydrationLatch.release(1);
+    /**
+     * Find the partner-link-my-role that corresponds to the given service name.
+     * 
+     * @param serviceName
+     *            name of service
+     * @return corresponding {@link PartnerLinkMyRoleImpl}
+     */
+    private PartnerLinkMyRoleImpl getPartnerLinkForService(QName serviceName) {
+        assert _hydrationLatch.isLatched(1);
+        
+        PartnerLinkMyRoleImpl target = null;
+        for (Endpoint endpoint : _endpointToMyRoleMap.keySet()) {
+            if (endpoint.serviceName.equals(serviceName))
+                target = _endpointToMyRoleMap.get(endpoint);
         }
-    }
 
-    public ReplacementMap getReplacementMap() {
-        _hydrationLatch.latch(1);
-        try {
-            return _replacementMap;
-        } finally {
-            _hydrationLatch.release(1);
-        }
+        return target;
+
+    }
+    
+    /**
+     * Used by {@link BpelRuntimeContextImpl} constructor. Should only be called from latched context. 
+     * @return
+     */
+    ReplacementMap getReplacementMap() {
+        assert _hydrationLatch.isLatched(1);
+        return _replacementMap;
     }
 
     public boolean isInMemory() {
@@ -978,11 +1003,11 @@
 
     }
 
-    MessageExchangeDAO createMessageExchange(final char dir) {
+    MessageExchangeDAO createMessageExchange(String mexId, final char dir) {
         if (isInMemory()) {
-            return _inMemDao.getConnection().createMessageExchange(dir);
+            return _inMemDao.getConnection().createMessageExchange(mexId, dir);
         } else {
-            return _contexts.dao.getConnection().createMessageExchange(dir);
+            return _contexts.dao.getConnection().createMessageExchange(mexId, dir);
         }
     }
 
@@ -996,15 +1021,13 @@
      * 
      * @param runnable
      */
-    public void scheduleRunnable(final Runnable runnable) {
+    void scheduleRunnable(final Runnable runnable) {
         if (__log.isDebugEnabled())
             __log.debug("schedulingRunnable for process " + _pid + ": " + runnable);
 
         _server.scheduleRunnable(new ProcessRunnable(runnable));
     }
 
-
-    
     class ProcessRunnable implements Runnable {
         Runnable _work;
 
@@ -1043,69 +1066,52 @@
 
     }
 
-    public MyRoleMessageExchange createNewMyRoleMex(final InvocationStyle istyle, final QName targetService, final String operation, final String clientKey) {
+    public MyRoleMessageExchange createNewMyRoleMex(final InvocationStyle istyle, 
+            final QName targetService,
+            final String operation, 
+            final String clientKey) {
+        
+        final String mexId = new GUID().toString();
         _hydrationLatch.latch(1);
         try {
-            
-        final PartnerLinkMyRoleImpl target = getPartnerLinkForService(targetService);
-        if (target == null)
-            throw new BpelEngineException("NoSuchService: " + targetService);
-        final Operation op = target._plinkDef.getMyRoleOperation(operation);
-        if (op == null)
-            throw new BpelEngineException("NoSuchOperation: " + operation);
-        
-        final MessageExchangePattern pattern = op.getOutput() == null ? MessageExchange.MessageExchangePattern.REQUEST_ONLY
-                    : MessageExchange.MessageExchangePattern.REQUEST_RESPONSE;
 
-        Callable<MyRoleMessageExchange> createDao = new Callable<MyRoleMessageExchange>() {
+            final PartnerLinkMyRoleImpl target = getPartnerLinkForService(targetService);
+            if (target == null)
+                throw new BpelEngineException("NoSuchService: " + targetService);
+            final Operation op = target._plinkDef.getMyRoleOperation(operation);
+            if (op == null)
+                throw new BpelEngineException("NoSuchOperation: " + operation);
 
-            public MyRoleMessageExchange call() throws Exception {
-                MessageExchangeDAO dao = createMessageExchange(MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
-                dao.setInvocationStyle(istyle.toString());
-                dao.setCorrelationId(clientKey);
-                dao.setCorrelationStatus(CorrelationStatus.UKNOWN_ENDPOINT.toString());
-                dao.setPattern(pattern.toString());
-                dao.setCallee(targetService);
-                dao.setStatus(Status.NEW.toString());
-                dao.setOperation(operation);
-                dao.setPartnerLinkModelId(target._plinkDef.getId());
-                dao.setTimeout(30 * 1000); // default timeout is 30 seconds, can be chaged by client.
-                return createMyRoleMex(dao);
-            }
-
-        };
-
-        try {
-            if (isInMemory() || istyle == InvocationStyle.TRANSACTED || istyle == InvocationStyle.RELIABLE) 
-                return createDao.call();
-            else
-                return _contexts.execTransaction(createDao);
+            return newMyRoleMex(istyle, mexId, target._endpoint.serviceName, target._plinkDef, op);  
             
-        } catch (BpelEngineException be) {
-            throw be;
-        } catch (Exception e) {
-            __log.error("Internal Error: could not create message exchange.", e);
-            throw new BpelEngineException("Internal Error", e);
+        } finally {
+            _hydrationLatch.release(1);
         }
-        
-    } finally {
-        _hydrationLatch.release(1);
-    }
     }
 
-    /**
-     * Find the partner-link-my-role that corresponds to the given service name.
-     * @param serviceName name of service
-     * @return corresponding {@link PartnerLinkMyRoleImpl}
-     */
-    private PartnerLinkMyRoleImpl getPartnerLinkForService(QName serviceName) {
-        PartnerLinkMyRoleImpl target = null;
-        for (Endpoint endpoint : getEndpointToMyRoleMap().keySet()) {
-            if (endpoint.serviceName.equals(serviceName))
-                target = getEndpointToMyRoleMap().get(endpoint);
+    void registerMyRoleMex(MyRoleMessageExchangeImpl mymex) {
+        _mexStateListeners.add(new WeakReference<MyRoleMessageExchangeImpl>(mymex));
+    }
+    
+    void unregisterMyRoleMex(MyRoleMessageExchangeImpl mymex) {
+        ArrayList<WeakReference<MyRoleMessageExchangeImpl>> needsRemoval = new ArrayList<WeakReference<MyRoleMessageExchangeImpl>>();
+        for (WeakReference<MyRoleMessageExchangeImpl> wref : _mexStateListeners) { 
+            MyRoleMessageExchangeImpl mex = wref.get();
+            if (mex == null || mex == mymex)
+                needsRemoval.add(wref);
+        }
+        _mexStateListeners.removeAll(needsRemoval);
+            
+    }
+    
+    void fireMexStateEvent(MessageExchangeDAO mexdao) {
+        for (WeakReference<MyRoleMessageExchangeImpl> wr:  _mexStateListeners) {
+            MyRoleMessageExchangeImpl mymex = wr.get();
+            if (mymex != null && mymex.getMessageExchangeId() != null)
+                mymex.onStateChanged(mexdao);
         }
-        
-        return target;
 
     }
+
+
 }

Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?view=diff&rev=561873&r1=561872&r2=561873
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java Wed Aug  1 10:07:35 2007
@@ -135,7 +135,9 @@
 
     private boolean _executed;
 
-    public BpelRuntimeContextImpl(BpelInstanceWorker instanceWorker, ProcessInstanceDAO dao, PROCESS PROCESS,
+    public BpelRuntimeContextImpl(BpelInstanceWorker instanceWorker, 
+            ProcessInstanceDAO dao, 
+            PROCESS PROCESS,
             MessageExchangeDAO instantiatingMessageExchange) {
         _instanceWorker = instanceWorker;
         _bpelProcess = instanceWorker._process;
@@ -558,7 +560,6 @@
                 scheduleAsyncResponse(myrolemex);
                 break;
             default:
-                // DO NOTHING
                 break;
             }
 
@@ -693,7 +694,7 @@
         evt.setPortType(partnerLink.partnerLink.partnerRolePortType.getQName());
         evt.setAspect(ProcessMessageExchangeEvent.PARTNER_INPUT);
 
-        MessageExchangeDAO mexDao = _dao.getConnection().createMessageExchange(MessageExchangeDAO.DIR_BPEL_INVOKES_PARTNERROLE);
+        MessageExchangeDAO mexDao = _dao.getConnection().createMessageExchange(new GUID().toString(),MessageExchangeDAO.DIR_BPEL_INVOKES_PARTNERROLE);
         mexDao.setStatus(MessageExchange.Status.NEW.toString());
         mexDao.setOperation(operation.getName());
         mexDao.setPortType(partnerLink.partnerLink.partnerRolePortType.getQName());
@@ -779,22 +780,22 @@
         if (supportedStyles.contains(InvocationStyle.RELIABLE)) {
             // If RELIABLE is supported, this is easy, we just do it in-line.
             ReliablePartnerRoleMessageExchangeImpl reliableMex = new ReliablePartnerRoleMessageExchangeImpl(_bpelProcess, mexDao
-                    .getMessageExchangeId(), portType, operation, partnerEpr, myRoleEpr, partnerRoleChannel);
+                    .getMessageExchangeId(), partnerLink.partnerLink, operation, partnerEpr, myRoleEpr, partnerRoleChannel);
             _contexts.mexContext.invokePartnerReliable(reliableMex);
         } else if (supportedStyles.contains(InvocationStyle.TRANSACTED)) {
             // If TRANSACTED is supported, this is again easy, do it in-line.
             TransactedPartnerRoleMessageExchangeImpl transactedMex = new TransactedPartnerRoleMessageExchangeImpl(_bpelProcess,
-                    mexDao.getMessageExchangeId(), portType, operation, partnerEpr, myRoleEpr, partnerRoleChannel);
+                    mexDao.getMessageExchangeId(), partnerLink.partnerLink, operation, partnerEpr, myRoleEpr, partnerRoleChannel);
             _contexts.mexContext.invokePartnerTransacted(transactedMex);
         } else if (supportedStyles.contains(InvocationStyle.BLOCKING)) {
             // For BLOCKING invocation, we defer the call until after commit (unless idempotent).
             BlockingPartnerRoleMessageExchangeImpl blockingMex = new BlockingPartnerRoleMessageExchangeImpl(_bpelProcess, mexDao
-                    .getMessageExchangeId(), portType, operation, partnerEpr, myRoleEpr, partnerRoleChannel);
+                    .getMessageExchangeId(), partnerLink.partnerLink, operation, partnerEpr, myRoleEpr, partnerRoleChannel);
             schedule(new BlockingInvoker(blockingMex));
         } else if (supportedStyles.contains(InvocationStyle.ASYNC)) {
             // For ASYNC style, we defer the call until after commit (unless idempotent).
             AsyncPartnerRoleMessageExchangeImpl asyncMex = new AsyncPartnerRoleMessageExchangeImpl(_bpelProcess, mexDao
-                    .getMessageExchangeId(), portType, operation, partnerEpr, myRoleEpr, partnerRoleChannel);
+                    .getMessageExchangeId(), partnerLink.partnerLink, operation, partnerEpr, myRoleEpr, partnerRoleChannel);
             schedule(new AsyncInvoker(asyncMex));
             
         } else {
@@ -866,7 +867,7 @@
         if (__log.isDebugEnabled())
             __log.debug("INVOKE PARTNER (SEP): sessionId=" + mySessionId + " partnerSessionId=" + partnerSessionId);
 
-        MessageExchangeDAO myRoleMex = _bpelProcess.createMessageExchange(MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
+        MessageExchangeDAO myRoleMex = _bpelProcess.createMessageExchange(new GUID().toString(),MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
         myRoleMex.setCallee(serviceName);
         myRoleMex.setPipedMessageExchange(partnerRoleMex);
         myRoleMex.setOperation(partnerRoleMex.getOperation());
@@ -1393,7 +1394,7 @@
         assert !_bpelProcess.isInMemory() : "Internal error; attempt to schedule in-memory process";
         assert _contexts.isTransacted();
 
-        final MyRoleMessageExchangeImpl mex = _bpelProcess.createMyRoleMex(mexdao);
+        final MyRoleMessageExchangeImpl mex = _bpelProcess.recreateMyRoleMex(mexdao);
         _pendingMyRoleReplies.add(mex);
     }
 

Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java?view=diff&rev=561873&r1=561872&r2=561873
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java Wed Aug  1 10:07:35 2007
@@ -18,7 +18,6 @@
  */
 package org.apache.ode.bpel.engine;
 
-import java.lang.ref.WeakReference;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
@@ -57,9 +56,6 @@
 import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
 import org.apache.ode.bpel.iapi.ProcessConf;
 import org.apache.ode.bpel.iapi.Scheduler;
-import org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern;
-import org.apache.ode.bpel.iapi.MessageExchange.Status;
-import org.apache.ode.bpel.iapi.MyRoleMessageExchange.CorrelationStatus;
 import org.apache.ode.bpel.iapi.Scheduler.JobInfo;
 import org.apache.ode.bpel.iapi.Scheduler.JobProcessorException;
 import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
@@ -89,8 +85,6 @@
 
     private static final Messages __msgs = MessageBundle.getMessages(Messages.class);
 
-    private final List<WeakReference<MessageExchangeStateListener>> _mexStateListeners = new ArrayList<WeakReference<MessageExchangeStateListener>>();
-
     /** Maximum age of a process before it is quiesced */
     private static Long __processMaxAge;
 
@@ -525,7 +519,7 @@
                     case MessageExchangeDAO.DIR_BPEL_INVOKES_PARTNERROLE:
                         return process.createPartnerRoleMex(mexdao);
                     case MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE:
-                        return process.createMyRoleMex(mexdao);
+                        return process.recreateMyRoleMex(mexdao);
                     default:
                         String errmsg = "BpelEngineImpl: internal error, invalid MexDAO direction: " + mexId;
                         __log.fatal(errmsg);
@@ -584,11 +578,6 @@
         return null;
     }
     
-    void registerMessageExchangeStateListener(MessageExchangeStateListener mexStateListener) {
-        WeakReference<MessageExchangeStateListener> ref = new WeakReference<MessageExchangeStateListener>(mexStateListener);
-
-    }
-
     OProcess getOProcess(QName processId) {
         _mngmtLock.readLock().lock();
         try {

Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java?view=diff&rev=561873&r1=561872&r2=561873
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java Wed Aug  1 10:07:35 2007
@@ -37,6 +37,7 @@
 import org.apache.ode.bpel.iapi.InvocationStyle;
 import org.apache.ode.bpel.iapi.Message;
 import org.apache.ode.bpel.iapi.MessageExchange;
+import org.apache.ode.bpel.o.OPartnerLink;
 import org.apache.ode.utils.msg.MessageBundle;
 import org.w3c.dom.Element;
 
@@ -73,32 +74,29 @@
 
     final BpelProcess _process;
 
+    final OPartnerLink _oplink;
+
+    /** Message-exchange id. */
     final String _mexId;
 
-    /** Instance identifier. */
-    Long _iid;
+    final PortType _portType;
 
-    PortType _portType;
+    final Operation _operation;
 
-    Operation _operation;
+    /** Instance identifier. */
+    Long _iid;
 
     EndpointReference _epr;
 
-    MessageExchangePattern _pattern;
-
-    String _opname;
-
     MessageImpl _request;
 
-    boolean _associated;
-
     /** The point at which this message-exchange will time out. */
-    long _timeout;
+    long _timeout = 30 * 1000;
 
     //
     // The following fields need to be volatile, since a random  IL thread may set them.
     //
-    volatile Status _status;
+    private volatile Status _status = Status.NEW;
 
     volatile QName _fault;
 
@@ -132,23 +130,30 @@
     private Set<String> _propNames;
 
 
-    public MessageExchangeImpl(BpelProcess process, String mexId) {
+
+    public MessageExchangeImpl(
+            BpelProcess process, 
+            String mexId,
+            OPartnerLink oplink, 
+            PortType ptype, 
+            Operation operation) {
         _process = process;
-        _contexts = process._contexts; 
+        _contexts = process._contexts;
         _mexId = mexId;
+        _oplink = oplink;
+        _portType  = ptype;
+        _operation = operation;
     }
 
     @Override
     public boolean equals(Object other) {
         return _mexId.equals(((MessageExchangeImpl)other)._mexId);
     }
+
     
     void load(MessageExchangeDAO dao) {
-        if (!dao.getMessageExchangeId().equals(_mexId))
-            throw new IllegalArgumentException("MessageExchangeId mismatch!");
-        _pattern = MessageExchangePattern.valueOf(dao.getPattern());
-        _opname = dao.getOperation();
         _timeout = dao.getTimeout();
+        _iid = dao.getInstance() != null ? dao.getInstance().getInstanceId() : null;
         
         if (_fault == null)
             _fault = dao.getFault();
@@ -159,13 +164,21 @@
     }
 
     public void save(MessageExchangeDAO dao) {
+        dao.setPartnerLinkModelId(_oplink.getId());
+        dao.setOperation(_operation.getName());
         dao.setStatus(_status.toString());
         dao.setInvocationStyle(getInvocationStyle().toString());
         dao.setFault(_fault);
         dao.setFaultExplanation(_explanation);
         dao.setTimeout(_timeout);
-        // todo: set failureType
+        dao.setFailureType(_failureType == null ? null : _failureType.toString());
+        
 
+        if (_changes.contains(Change.REQUEST)) {
+            MessageDAO requestDao = dao.createMessage(_request.getType());
+            requestDao.setData(_request.getMessage());            
+        }
+        
         if (_changes.contains(Change.RESPONSE)) {
             MessageDAO responseDao = dao.createMessage(_response.getType());
             responseDao.setData(_response.getMessage());
@@ -212,11 +225,11 @@
     }
 
     public String getOperationName() throws BpelEngineException {
-        return _opname;
+        return getOperation().getName();
     }
 
     public MessageExchangePattern getMessageExchangePattern() {
-        return _pattern;
+        return _operation.getOutput()==null ? MessageExchangePattern.REQUEST_ONLY : MessageExchangePattern.REQUEST_RESPONSE; 
     }
 
     public boolean isTransactional() throws BpelEngineException {
@@ -283,14 +296,7 @@
         });
     }
 
-    void init(PortType portType, Operation operation, MessageExchangePattern pattern) {
-        if (__log.isTraceEnabled())
-            __log.trace("Mex[" + getMessageExchangeId() + "].setPortOp(" + portType + "," + operation + ")");
-        _portType = portType;
-        _operation = operation;
-        _pattern = pattern;
-    }
-
+    
     void setFault(QName faultType, Message outputFaultMessage) throws BpelEngineException {
         setStatus(Status.FAULT);
         _fault = faultType;
@@ -409,7 +415,7 @@
             return action.call(getDAO());
         } else {
             try {
-                return _process._server.enqueueTransaction(new Callable<T>() {
+                return _process.enqueueTransaction(new Callable<T>() {
                     public T call() throws Exception {
                         assertTransaction();
                         return action.call(getDAO());

Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Messages.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Messages.java?view=diff&rev=561873&r1=561872&r2=561873
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Messages.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Messages.java Wed Aug  1 10:07:35 2007
@@ -190,4 +190,8 @@
         return format("Scheduled job failed; jobDetail={0}", jobDetail);
     }
 
+    public String msgDbConsistencyError(String detail) {
+        return format("Database consistency error: {0}", detail);
+    }
+
 }

Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java?view=diff&rev=561873&r1=561872&r2=561873
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java Wed Aug  1 10:07:35 2007
@@ -1,8 +1,10 @@
 package org.apache.ode.bpel.engine;
 
+import java.lang.ref.WeakReference;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeoutException;
 
+import javax.wsdl.Operation;
 import javax.xml.namespace.QName;
 
 import org.apache.commons.logging.Log;
@@ -17,21 +19,27 @@
 import org.apache.ode.bpel.intercept.InterceptorInvoker;
 import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
 import org.apache.ode.bpel.intercept.MessageExchangeInterceptor.InterceptorContext;
+import org.apache.ode.bpel.o.OPartnerLink;
 
 abstract class MyRoleMessageExchangeImpl extends MessageExchangeImpl implements MyRoleMessageExchange {
 
     private static final Log __log = LogFactory.getLog(MyRoleMessageExchangeImpl.class);
     
+    protected final QName _callee;
+
     protected CorrelationStatus _cstatus;
 
     protected String _clientId;
 
-    protected QName _callee;
-
-    public MyRoleMessageExchangeImpl(BpelProcess process, String mexId) {
-        super(process, mexId);
+    public MyRoleMessageExchangeImpl(BpelProcess process,
+            String mexId, 
+            OPartnerLink oplink, 
+            Operation operation,
+            QName callee) {
+        super(process, mexId, oplink, oplink.myRolePortType, operation);
+        _callee = callee;
     }
-
+        
     public CorrelationStatus getCorrelationStatus() {
         return _cstatus;
     }
@@ -41,7 +49,6 @@
         super.load(dao);
         _cstatus = CorrelationStatus.valueOf(dao.getCorrelationStatus());
         _clientId = dao.getCorrelationId();
-        _callee = dao.getCallee();
     }
 
     @Override
@@ -83,7 +90,7 @@
 
     public String toString() {
         try {
-            return "{MyRoleMex#" + _mexId + " [Client " + _clientId + "] calling " + _callee + "." + _opname + "(...)}";
+            return "{MyRoleMex#" + _mexId + " [Client " + _clientId + "] calling " + _callee + "." + getOperationName() + "(...)}";
         } catch (Throwable t) {
             return "{MyRoleMex#???}";
         }
@@ -94,7 +101,7 @@
 
     }
 
-    protected void scheduleInvoke(BpelProcess target) {
+    protected void scheduleInvoke() {
         
         assert !_process.isInMemory() : "Cannot schedule invokes for in-memory processes.";
         assert _contexts.isTransacted() : "Cannot schedule outside of transaction context.";
@@ -102,13 +109,13 @@
         // Schedule a new job for invocation
         final WorkEvent we = new WorkEvent();
         we.setType(WorkEvent.Type.MYROLE_INVOKE);
-        we.setProcessId(target.getPID());
+        we.setProcessId(_process.getPID());
         we.setMexId(_mexId);
 
         // Schedule a timeout 
         final WorkEvent we1 = new WorkEvent();
         we1.setType(WorkEvent.Type.MYROLE_INVOKE_TIMEOUT);
-        we1.setProcessId(target.getPID());
+        we1.setProcessId(_process.getPID());
         we1.setMexId(_mexId);
         
         setStatus(Status.ASYNC);
@@ -155,11 +162,13 @@
     }
 
 
-    /**
-     * Callback.
-     * 
-     * @param mexdao
-     */
-    protected void onMessageExchangeComplete(MessageExchangeDAO mexdao) {
+    protected void onStateChanged(MessageExchangeDAO mexdao) {
+        setStatus(Status.valueOf(mexdao.getStatus()));
     }
+    
+    
+    protected void finalize() {
+        _process.unregisterMyRoleMex(this);
+    }
+
 }

Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/NStateLatch.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/NStateLatch.java?view=diff&rev=561873&r1=561872&r2=561873
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/NStateLatch.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/NStateLatch.java Wed Aug  1 10:07:35 2007
@@ -107,4 +107,8 @@
             _lock.unlock();
         }
     }
+    
+    public boolean isLatched(int state) {
+        return _state == state && _depth > 0;
+    }
 }

Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java?view=diff&rev=561873&r1=561872&r2=561873
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java Wed Aug  1 10:07:35 2007
@@ -20,7 +20,6 @@
 package org.apache.ode.bpel.engine;
 
 import javax.wsdl.Operation;
-import javax.wsdl.PortType;
 import javax.xml.namespace.QName;
 
 import org.apache.commons.logging.Log;
@@ -33,6 +32,7 @@
 import org.apache.ode.bpel.iapi.MessageExchangeContext;
 import org.apache.ode.bpel.iapi.PartnerRoleChannel;
 import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
+import org.apache.ode.bpel.o.OPartnerLink;
 import org.w3c.dom.Element;
 
 /**
@@ -64,14 +64,11 @@
 
     volatile boolean _blocked = false;
 
-    PartnerRoleMessageExchangeImpl(BpelProcess process, String mexId, PortType portType, Operation operation,
+    PartnerRoleMessageExchangeImpl(BpelProcess process, String mexId, OPartnerLink oplink, Operation operation,
             EndpointReference epr, EndpointReference myRoleEPR, PartnerRoleChannel channel) {
-        super(process, mexId);
+        super(process, mexId, oplink, oplink.partnerRolePortType, operation);
         _myRoleEPR = myRoleEPR;
         _partnerRoleChannel = channel;
-
-        init(portType, operation, (operation.getOutput() == null) ? MessageExchangePattern.REQUEST_ONLY
-                : MessageExchangePattern.REQUEST_RESPONSE);
     }
 
     @Override
@@ -150,7 +147,7 @@
 
     public String toString() {
         try {
-            return "{PartnerRoleMex#" + _mexId + " [PID " + getCaller() + "] calling " + _epr + "." + _opname + "(...)}";
+            return "{PartnerRoleMex#" + _mexId + " [PID " + getCaller() + "] calling " + _epr + "." + getOperationName() + "(...)}";
 
         } catch (Throwable t) {
             return "{PartnerRoleMex#????}";
@@ -184,8 +181,8 @@
 
     protected void checkReplyContextOk() {
         // Prevent duplicate replies.
-        if (_status != MessageExchange.Status.REQUEST && _status != MessageExchange.Status.ASYNC)
-            throw new BpelEngineException("Invalid message exchange state, expect REQUEST or ASYNC, but got " + _status);
+        if (getStatus() != MessageExchange.Status.REQUEST && getStatus() != MessageExchange.Status.ASYNC)
+            throw new BpelEngineException("Invalid message exchange state, expect REQUEST or ASYNC, but got " + getStatus());
 
         // In-memory processe are special, they don't allow scheduling so any replies must be delivered immediately.
         if (!_blocked && _process.isInMemory())

Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java?view=diff&rev=561873&r1=561872&r2=561873
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java Wed Aug  1 10:07:35 2007
@@ -35,7 +35,9 @@
 import org.apache.ode.bpel.intercept.InterceptorInvoker;
 import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
 import org.apache.ode.bpel.intercept.MessageExchangeInterceptor.InterceptorContext;
+import org.apache.ode.bpel.o.OPartnerLink;
 
+import javax.wsdl.Operation;
 import javax.xml.namespace.QName;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -57,21 +59,20 @@
     public static final int TIMEOUT = 2 * 60 * 1000;
 
     
-    public ReliableMyRoleMessageExchangeImpl(BpelProcess process, String mexId) {
-        super(process, mexId);
+    public ReliableMyRoleMessageExchangeImpl(BpelProcess process, String mexId, OPartnerLink oplink, Operation operation, QName callee) {
+        super(process, mexId, oplink, operation, callee);
     }
 
-
     public void invokeReliable() {
         // For reliable, we MUST HAVE A TRANSACTION!
         assertTransaction();
 
         // Cover the case where invoke was already called. 
-        if (_status == Status.REQUEST)
+        if (getStatus() == Status.REQUEST)
             return;
         
-        if (_status != Status.NEW)
-            throw new BpelEngineException("Invalid state: " + _status);
+        if (getStatus() != Status.NEW)
+            throw new BpelEngineException("Invalid state: " + getStatus());
         
         if (!processInterceptors(InterceptorInvoker.__onBpelServerInvoked, getDAO())) {
             throw new BpelEngineException("Intercepted.");
@@ -81,7 +82,7 @@
             __log.debug("invoke() EPR= " + _epr + " ==> " + _process);
         setStatus(Status.REQUEST);
         save(getDAO());
-        scheduleInvoke(_process);
+        scheduleInvoke();
     }
 
 

Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java?view=diff&rev=561873&r1=561872&r2=561873
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java Wed Aug  1 10:07:35 2007
@@ -1,7 +1,6 @@
 package org.apache.ode.bpel.engine;
 
 import javax.wsdl.Operation;
-import javax.wsdl.PortType;
 
 import org.apache.ode.bpel.dao.MessageExchangeDAO;
 import org.apache.ode.bpel.engine.MessageExchangeImpl.InDbAction;
@@ -10,11 +9,12 @@
 import org.apache.ode.bpel.iapi.InvocationStyle;
 import org.apache.ode.bpel.iapi.PartnerRoleChannel;
 import org.apache.ode.bpel.iapi.MessageExchange.Status;
+import org.apache.ode.bpel.o.OPartnerLink;
 
 public class ReliablePartnerRoleMessageExchangeImpl extends PartnerRoleMessageExchangeImpl {
 
-    public ReliablePartnerRoleMessageExchangeImpl(BpelProcess process, String mexId, PortType ptype, Operation op, EndpointReference epr, EndpointReference myRoleEPR, PartnerRoleChannel partnerRoleChannel) {
-        super(process, mexId, ptype, op, epr, myRoleEPR, partnerRoleChannel);
+    public ReliablePartnerRoleMessageExchangeImpl(BpelProcess process, String mexId, OPartnerLink oplink, Operation op, EndpointReference epr, EndpointReference myRoleEPR, PartnerRoleChannel partnerRoleChannel) {
+        super(process, mexId, oplink, op, epr, myRoleEPR, partnerRoleChannel);
     }
 
     

Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java?view=diff&rev=561873&r1=561872&r2=561873
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java Wed Aug  1 10:07:35 2007
@@ -1,9 +1,13 @@
 package org.apache.ode.bpel.engine;
 
+import javax.wsdl.Operation;
+import javax.xml.namespace.QName;
+
 import org.apache.ode.bpel.iapi.BpelEngineException;
 import org.apache.ode.bpel.iapi.InvocationStyle;
 import org.apache.ode.bpel.iapi.MessageExchange;
 import org.apache.ode.bpel.iapi.MessageExchange.Status;
+import org.apache.ode.bpel.o.OPartnerLink;
 
 /**
  * Transacted my-role message exchange.
@@ -15,8 +19,8 @@
  */
 public class TransactedMyRoleMessageExchangeImpl extends MyRoleMessageExchangeImpl {
 
-    public TransactedMyRoleMessageExchangeImpl(BpelProcess target, String mexId) {
-        super(target, mexId);
+    public TransactedMyRoleMessageExchangeImpl(BpelProcess process, String mexId, OPartnerLink oplink, Operation operation, QName callee) {
+        super(process, mexId, oplink, operation, callee);
     }
 
     @Override

Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java?view=diff&rev=561873&r1=561872&r2=561873
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java Wed Aug  1 10:07:35 2007
@@ -181,10 +181,9 @@
         throw new UnsupportedOperationException("Can't query process configuration using a transient DAO.");
     }
 
-    public MessageExchangeDAO createMessageExchange(char dir) {
-        String id = Long.toString(counter.getAndIncrement());
-        MessageExchangeDAO mex = new MessageExchangeDAOImpl(dir, id);
-        _mexStore.put(id, mex);
+    public MessageExchangeDAO createMessageExchange(String mexId, char dir) {
+        MessageExchangeDAO mex = new MessageExchangeDAOImpl(dir, mexId);
+        _mexStore.put(mexId, mex);
         return mex;
     }
 

Modified: ode/branches/bart/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java?view=diff&rev=561873&r1=561872&r2=561873
==============================================================================
--- ode/branches/bart/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java (original)
+++ ode/branches/bart/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java Wed Aug  1 10:07:35 2007
@@ -342,4 +342,9 @@
         }
     }
 
+    public void waitForBlocking() {
+        // TODO Auto-generated method stub
+        
+    }
+
 }