You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by ms...@apache.org on 2007/08/01 19:07:41 UTC
svn commit: r561873 - in /ode/branches/bart/bpel-runtime/src:
main/java/org/apache/ode/bpel/engine/ main/java/org/apache/ode/bpel/memdao/
test/java/org/apache/ode/bpel/runtime/
Author: mszefler
Date: Wed Aug 1 10:07:35 2007
New Revision: 561873
URL: http://svn.apache.org/viewvc?view=rev&rev=561873
Log:
change meximpl to allow defered creation of DAO (until invokeXXX is
called). this saves a transaction on ASYNC/BLOCKING.
Added:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java (with props)
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorkerCache.java (with props)
Removed:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncPartnerRoleMessageExchangeImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingPartnerRoleMessageExchangeImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelDatabase.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEventListener.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelManagementFacadeImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Messages.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/NStateLatch.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java
ode/branches/bart/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java
Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java?view=diff&rev=561873&r1=561872&r2=561873
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java Wed Aug 1 10:07:35 2007
@@ -1,19 +1,24 @@
package org.apache.ode.bpel.engine;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import javax.wsdl.Operation;
+import javax.xml.namespace.QName;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ode.bpel.dao.MessageExchangeDAO;
import org.apache.ode.bpel.iapi.InvocationStyle;
+import org.apache.ode.bpel.o.OPartnerLink;
/**
* For invoking the engine using ASYNC style.
*
- * @author Maciej Szefler
+ * @author Maciej Szefler <mszefler at gmail dot com>
*
*/
public class AsyncMyRoleMessageExchangeImpl extends MyRoleMessageExchangeImpl {
@@ -21,8 +26,25 @@
ResponseFuture _future;
- public AsyncMyRoleMessageExchangeImpl(BpelProcess process, String mexId) {
- super(process, mexId);
+ public AsyncMyRoleMessageExchangeImpl(BpelProcess process, String mexId, OPartnerLink oplink, Operation operation, QName callee) {
+ super(process, mexId, oplink, operation, callee);
+ }
+
+ /**
+ * Override the setStatus(...) to notify our future when there is a response/failure.
+ */
+ protected void setStatus(Status status) {
+ Status old = getStatus();
+ super.setStatus(status);
+ if (_future != null) {
+ if (getMessageExchangePattern() == MessageExchangePattern.REQUEST_ONLY) {
+ if (old == Status.REQUEST && old != status)
+ _future.done(status);
+ } else /* two-way */ {
+ if ((old == Status.ASYNC || old == Status.REQUEST) && status != Status.ASYNC)
+ _future.done(status);
+ }
+ }
}
public Future<Status> invokeAsync() {
@@ -30,31 +52,25 @@
return _future;
_future = new ResponseFuture();
+ _process.enqueueTransaction(new Callable<Void>() {
- if (_process.isInMemory()) {
- _process.invokeProcess(_process.getInMemMexDAO(_mexId));
- } else {
- doInTX(new InDbAction<Void>() {
- public Void call(MessageExchangeDAO mexdao) {
- scheduleInvoke(_process);
- return null;
- }
- });
- }
+ public Void call() throws Exception {
+ MessageExchangeDAO dao = _process.createMessageExchange(getMessageExchangeId(), MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
+ save(dao);
+ if (_process.isInMemory())
+ _process.invokeProcess(dao);
+ else
+ scheduleInvoke();
+ return null;
+ }
+
+ });
- if (getOperation().getOutput() == null) {
- _future.done(getStatus());
- }
-
+
return _future;
}
- protected void onMessageExchangeComplete(MessageExchangeDAO mexdao) {
- load(mexdao);
- _future.done(getStatus());
- }
-
private static class ResponseFuture implements Future<Status> {
private Status _status;
Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncPartnerRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncPartnerRoleMessageExchangeImpl.java?view=diff&rev=561873&r1=561872&r2=561873
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncPartnerRoleMessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncPartnerRoleMessageExchangeImpl.java Wed Aug 1 10:07:35 2007
@@ -1,7 +1,6 @@
package org.apache.ode.bpel.engine;
import javax.wsdl.Operation;
-import javax.wsdl.PortType;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -11,6 +10,7 @@
import org.apache.ode.bpel.iapi.InvocationStyle;
import org.apache.ode.bpel.iapi.PartnerRoleChannel;
import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
+import org.apache.ode.bpel.o.OPartnerLink;
/**
* Implementation of the {@link PartnerRoleMessageExchange} interface that is used when the ASYNC invocation style is being used
@@ -18,16 +18,16 @@
* object) until the ODE transaction has committed, and it does not block during the performance of the operation. Hence, when a
* reply becomes available, we'll need to schedule a transaction to process it.
*
- * @author Maciej Szefler
+ * @author Maciej Szefler <mszefler at gmail dot com>
*
*/
public class AsyncPartnerRoleMessageExchangeImpl extends PartnerRoleMessageExchangeImpl {
private static final Log __log = LogFactory.getLog(AsyncPartnerRoleMessageExchangeImpl.class);
- AsyncPartnerRoleMessageExchangeImpl(BpelProcess process, String mexId, PortType portType, Operation operation,
+ AsyncPartnerRoleMessageExchangeImpl(BpelProcess process, String mexId, OPartnerLink oplink, Operation operation,
EndpointReference epr, EndpointReference myRoleEPR, PartnerRoleChannel channel) {
- super(process, mexId, portType, operation, epr, myRoleEPR, channel);
+ super(process, mexId, oplink, operation, epr, myRoleEPR, channel);
}
@Override
Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java?view=diff&rev=561873&r1=561872&r2=561873
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java Wed Aug 1 10:07:35 2007
@@ -5,20 +5,24 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import javax.wsdl.Operation;
+import javax.xml.namespace.QName;
+
import org.apache.ode.bpel.iapi.BpelEngineException;
import org.apache.ode.bpel.iapi.InvocationStyle;
+import org.apache.ode.bpel.o.OPartnerLink;
/**
* Non-transaction blocking MyRole message-exchange implementation.
*
- * @author Maciej Szefler
+ * @author Maciej Szefler <mszefler at gmail dot com>
*/
public class BlockingMyRoleMessageExchangeImpl extends AsyncMyRoleMessageExchangeImpl {
Future<Status> _future;
boolean _done = false;
- public BlockingMyRoleMessageExchangeImpl(BpelProcess process, String mexId) {
- super(process, mexId);
+ public BlockingMyRoleMessageExchangeImpl(BpelProcess process, String mexId, OPartnerLink oplink, Operation operation, QName callee) {
+ super(process, mexId, oplink, operation, callee);
}
@Override
@@ -29,14 +33,14 @@
@Override
public Status invokeBlocking() throws BpelEngineException, TimeoutException {
if (_done)
- return _status;
+ return getStatus();
Future<Status> future = _future != null ? _future : super.invokeAsync();
try {
- _status = future.get(Math.max(_timeout,1), TimeUnit.MILLISECONDS);
+ future.get(Math.max(_timeout,1), TimeUnit.MILLISECONDS);
_done = true;
- return _status;
+ return getStatus();
} catch (InterruptedException e) {
throw new BpelEngineException(e);
} catch (ExecutionException e) {
Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingPartnerRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingPartnerRoleMessageExchangeImpl.java?view=diff&rev=561873&r1=561872&r2=561873
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingPartnerRoleMessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingPartnerRoleMessageExchangeImpl.java Wed Aug 1 10:07:35 2007
@@ -1,7 +1,6 @@
package org.apache.ode.bpel.engine;
import javax.wsdl.Operation;
-import javax.wsdl.PortType;
import org.apache.ode.bpel.iapi.BpelEngineException;
import org.apache.ode.bpel.iapi.EndpointReference;
@@ -9,6 +8,7 @@
import org.apache.ode.bpel.iapi.MessageExchangeContext;
import org.apache.ode.bpel.iapi.PartnerRoleChannel;
import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
+import org.apache.ode.bpel.o.OPartnerLink;
/**
* Implementation of the {@link PartnerRoleMessageExchange} interface that is passed to the IL when the
@@ -18,13 +18,13 @@
*
* This InvocationStyle makes this class rather trivial.
*
- * @author Maciej Szefler
+ * @author Maciej Szefler <mszefler at gmail dot com>
*
*/
public class BlockingPartnerRoleMessageExchangeImpl extends PartnerRoleMessageExchangeImpl {
- BlockingPartnerRoleMessageExchangeImpl(BpelProcess process, String mexId, PortType portType, Operation operation, EndpointReference epr, EndpointReference myRoleEPR, PartnerRoleChannel channel) {
- super(process, mexId, portType, operation, epr, myRoleEPR, channel);
+ BlockingPartnerRoleMessageExchangeImpl(BpelProcess process, String mexId, OPartnerLink oplink, Operation operation, EndpointReference epr, EndpointReference myRoleEPR, PartnerRoleChannel channel) {
+ super(process, mexId, oplink, operation, epr, myRoleEPR, channel);
}
/**
Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelDatabase.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelDatabase.java?view=diff&rev=561873&r1=561872&r2=561873
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelDatabase.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelDatabase.java Wed Aug 1 10:07:35 2007
@@ -28,7 +28,12 @@
import org.apache.commons.logging.LogFactory;
/**
- * Encapsulates transactional access to the BPEL database.
+ * Mostly of historical interest. Provides transactional access to the BPEL database, defines a Callable-style
+ * interface for transactions.
+ *
+ * <p>Should probably be eliminated. --mszefler 2007-07-26 </p>
+ *
+ * @author Maciej Szefler <mszefler at gmail dot com>
*/
class BpelDatabase {
static Log __log = LogFactory.getLog(BpelDatabase.class);
Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEventListener.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEventListener.java?view=diff&rev=561873&r1=561872&r2=561873
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEventListener.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEventListener.java Wed Aug 1 10:07:35 2007
@@ -22,6 +22,8 @@
/**
* Interface implemented by listeners of BPEL events.
+ *
+ * @author Maciej Szefler <mszefler at gmail dot com>
*/
public interface BpelEventListener {
Added: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java?view=auto&rev=561873
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java (added)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java Wed Aug 1 10:07:35 2007
@@ -0,0 +1,151 @@
+package org.apache.ode.bpel.engine;
+
+import java.util.ArrayList;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Semaphore;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.dao.MessageExchangeDAO;
+import org.apache.ode.bpel.dao.ProcessInstanceDAO;
+import org.apache.ode.bpel.iapi.BpelEngineException;
+import org.apache.ode.bpel.runtime.PROCESS;
+
+/**
+ * Objects used for synchronizing the execution of instance-level work. All work on behalf of an instance is funneled to one of
+ * these objects. For all practical purposes these are singletons, with the caveat that they expire as soon as all work for an
+ * instance is complete so they may be recreated on-demand. The effect is that all work for an instance occurs in a single thread.
+ *
+ * @author Maciej Szefler <mszefler at gmail dot com>
+ *
+ */
+class BpelInstanceWorker implements Runnable {
+
+ private static final Log __log = LogFactory.getLog(BpelInstanceWorker.class);
+
+ final BpelProcess _process;
+
+ final Long _iid;
+
+ final Contexts _contexts;
+
+ private boolean _running = false;
+
+ private ArrayList<Runnable> _todoQueue = new ArrayList<Runnable>();
+
+ private final ThreadLocal<Long> _activeInstance = new ThreadLocal<Long>();
+
+ BpelInstanceWorker(BpelProcess process, Long iid) {
+ _process = process;
+ _iid = iid;
+ _contexts = _process._contexts;
+ }
+
+ Long getIID() {
+ return _iid;
+ }
+
+ /**
+ * Add a task for this instance.
+ *
+ * @param runnable
+ */
+ synchronized void enqueue(Runnable runnable) {
+ _todoQueue.add(runnable);
+ // We mayh need to reschedule this thread if we've dropped out of the end of the run() method.
+ if (!_running) {
+ _running = true;
+ _process.scheduleRunnable(this);
+ }
+ }
+
+ /**
+ * Execute some work on behalf of the instance, but don't do it in the worker thread, instead do it in the calling thread. Why
+ * bother? Well, sometimes we need to do some work in the current thread because it is associated with some transaction we'd
+ * like to use, but we don't want to go through the suspend/resume BS. Ok, so why not just do the work directly? Well we want to
+ * do the work as if it was occuring in our worker thread, so we have to block it for the duration of the action.
+ *
+ *
+ * @param <T>
+ * parameterization of {@link Callable}
+ * @param callable
+ * the thing to call
+ * @return return value of the callble
+ * @throws Exception
+ * forwarded from {@link Callable#call()}
+ */
+ synchronized <T> T execInCurrentThread(Callable<T> callable) throws Exception {
+ final Semaphore ready = new Semaphore(0);
+ final Semaphore finished = new Semaphore(0);
+ enqueue(new Runnable() {
+ public void run() {
+ ready.release();
+ try {
+ finished.acquire();
+ } catch (InterruptedException ie) {
+ __log.error("Thread interrupted.", ie);
+ throw new BpelEngineException("Thread interrupted.", ie);
+ }
+ }
+ });
+ try {
+ ready.acquire();
+ } catch (InterruptedException ex) {
+ __log.error("Thread interrupted.", ex);
+ throw new BpelEngineException("Thread interrupted.", ex);
+ }
+
+ _activeInstance.set(_iid);
+ try {
+ return callable.call();
+ } catch (Exception ex) {
+ throw ex;
+ } finally {
+ finished.release();
+ _activeInstance.set(null);
+ }
+
+ }
+
+
+
+ /**
+ * Implementation of the {@link Runnable} interface.
+ */
+ public void run() {
+ _activeInstance.set(_iid);
+ try {
+
+ do {
+ Runnable next;
+ synchronized (this) {
+ if (_todoQueue.isEmpty()) {
+ // This is the only way to drop out of this method short of some disasterous error. This is
+ // important since we need to synchronize _running with _todoQueue state.
+ _running = false;
+ return;
+ }
+
+ next = _todoQueue.remove(0);
+ }
+
+ try {
+ next.run();
+ } catch (Throwable t) {
+ __log.error("Unexpected error in instance thread.", t);
+ }
+ } while (true);
+ } finally {
+ _activeInstance.set(null);
+ }
+ }
+
+ public String toString() {
+ return "{BpelInstanceWorker: PID=" + _process.getPID() + " IID=" + _iid + "}";
+ }
+
+ public boolean isWorkerThread() {
+ return _activeInstance.get() != null;
+ }
+
+}
Propchange: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorkerCache.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorkerCache.java?view=auto&rev=561873
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorkerCache.java (added)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorkerCache.java Wed Aug 1 10:07:35 2007
@@ -0,0 +1,56 @@
+package org.apache.ode.bpel.engine;
+
+import java.lang.ref.Reference;
+import java.lang.ref.ReferenceQueue;
+import java.lang.ref.WeakReference;
+import java.util.HashMap;
+import java.util.WeakHashMap;
+
+/**
+ * A cache of {@link BpelInstanceWorker} objects.
+ *
+ * @author Maciej Szefler <mszefler at gmail dot com>
+ *
+ */
+class BpelInstanceWorkerCache {
+ private HashMap<Long, WeakReference<BpelInstanceWorker>> _cache = new HashMap<Long, WeakReference<BpelInstanceWorker>>();
+ private ReferenceQueue<BpelInstanceWorker> _refQ = new ReferenceQueue<BpelInstanceWorker>();
+
+ private BpelProcess _process;
+
+ public BpelInstanceWorkerCache(BpelProcess process) {
+ _process = process;
+ }
+
+ synchronized BpelInstanceWorker get(long iid) {
+ expungeStaleEntries();
+ WeakReference<BpelInstanceWorker> wref = _cache.get(iid);
+ BpelInstanceWorker worker;
+
+ // Case: not in cache.
+ if (wref == null) {
+ worker = new BpelInstanceWorker(_process, iid);
+ wref = new WeakReference<BpelInstanceWorker>(worker,_refQ);
+ _cache.put(iid, wref);
+ } else {
+ worker = wref.get();
+
+ // Case: garbage collected
+ if (worker == null) {
+ worker = new BpelInstanceWorker(_process, iid);
+ wref = new WeakReference<BpelInstanceWorker>(worker,_refQ);
+ _cache.put(iid, wref);
+ }
+ }
+
+ return worker;
+ }
+
+
+ private void expungeStaleEntries() {
+ Reference<? extends BpelInstanceWorker> x;
+ while ((x=_refQ.poll()) != null){
+ _cache.values().remove(x);
+ }
+ }
+}
Propchange: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorkerCache.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelManagementFacadeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelManagementFacadeImpl.java?view=diff&rev=561873&r1=561872&r2=561873
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelManagementFacadeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelManagementFacadeImpl.java Wed Aug 1 10:07:35 2007
@@ -40,6 +40,8 @@
* Implementation of the instance/process management interaction. This class implements
* the methods necessary to support process debugging. It also implements all the methods in the
* newer Process/Instance Management interface (pmapi).
+ *
+ * @author Maciej Szefler <mszefler at gmail dot com>
*/
public class BpelManagementFacadeImpl extends ProcessAndInstanceManagementImpl
implements BpelManagementFacade {
Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?view=diff&rev=561873&r1=561872&r2=561873
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java Wed Aug 1 10:07:35 2007
@@ -19,6 +19,7 @@
package org.apache.ode.bpel.engine;
import java.io.InputStream;
+import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -28,6 +29,7 @@
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import javax.wsdl.Operation;
@@ -72,6 +74,7 @@
import org.apache.ode.bpel.runtime.PropertyAliasEvaluationContext;
import org.apache.ode.bpel.runtime.channels.FaultData;
import org.apache.ode.jacob.soup.ReplacementMap;
+import org.apache.ode.utils.GUID;
import org.apache.ode.utils.ObjectPrinter;
import org.apache.ode.utils.msg.MessageBundle;
import org.w3c.dom.Element;
@@ -82,10 +85,10 @@
/**
* Entry point into the runtime of a BPEL process.
*
- * @author Maciej Szefler
+ * @author Maciej Szefler <mszefler at gmail dot com>
* @author Matthieu Riou <mriou at apache dot org>
*/
-public class BpelProcess {
+class BpelProcess {
static final Log __log = LogFactory.getLog(BpelProcess.class);
private static final Messages __msgs = MessageBundle.getMessages(Messages.class);
@@ -140,15 +143,9 @@
final BpelServerImpl _server;
- /** Indicates whether we are operating in a server-managed transaction. */
- private ThreadLocal<Boolean> _serverTx = new ThreadLocal<Boolean>() {
- @Override
- protected Boolean initialValue() {
- return Boolean.FALSE;
- }
- };
+ final private List<WeakReference<MyRoleMessageExchangeImpl>> _mexStateListeners = new CopyOnWriteArrayList<WeakReference<MyRoleMessageExchangeImpl>>();
- public BpelProcess(BpelServerImpl server, ProcessConf conf, BpelEventListener debugger) {
+ BpelProcess(BpelServerImpl server, ProcessConf conf, BpelEventListener debugger) {
_server = server;
_pid = conf.getProcessId();
_pconf = conf;
@@ -171,7 +168,7 @@
return "BpelProcess[" + _pid + "]";
}
- public void recoverActivity(ProcessInstanceDAO instanceDAO, String channel, long activityId, String action, FaultData fault) {
+ void recoverActivity(ProcessInstanceDAO instanceDAO, String channel, long activityId, String action, FaultData fault) {
if (__log.isDebugEnabled())
__log.debug("Recovering activity in process " + instanceDAO.getInstanceId() + " with action " + action);
markused();
@@ -180,13 +177,6 @@
// processInstance.recoverActivity(channel, activityId, action, fault);
}
- static String generateMessageExchangeIdentifier(String partnerlinkName, String operationName) {
- StringBuffer sb = new StringBuffer(partnerlinkName);
- sb.append('.');
- sb.append(operationName);
- return sb.toString();
- }
-
/**
* Entry point for message exchanges aimed at the my role.
*
@@ -204,6 +194,7 @@
mexdao.setFailureType(MessageExchange.FailureType.UNKNOWN_ENDPOINT.toString());
mexdao.setFaultExplanation(errmsg);
mexdao.setStatus(Status.FAILURE.toString());
+ fireMexStateEvent(mexdao);
return;
}
@@ -249,7 +240,9 @@
// }
}
- void executeCreateInstance(MessageExchangeDAO mexdao) {
+ private void executeCreateInstance(MessageExchangeDAO mexdao) {
+ assert _hydrationLatch.isLatched(1);
+
BpelInstanceWorker worker = _instanceWorkerCache.get(mexdao.getInstance().getInstanceId());
assert worker.isWorkerThread();
BpelRuntimeContextImpl instanceCtx = new BpelRuntimeContextImpl(worker, mexdao.getInstance(), new PROCESS(_oprocess),
@@ -257,7 +250,9 @@
instanceCtx.execute();
}
- void executeContinueInstance(MessageExchangeDAO mexdao) {
+ private void executeContinueInstance(MessageExchangeDAO mexdao) {
+ assert _hydrationLatch.isLatched(1);
+
BpelInstanceWorker worker = _instanceWorkerCache.get(mexdao.getInstance().getInstanceId());
assert worker.isWorkerThread();
BpelRuntimeContextImpl instance = new BpelRuntimeContextImpl(worker, mexdao.getInstance(), null, null);
@@ -305,7 +300,9 @@
}
private PartnerLinkMyRoleImpl getMyRoleForService(QName serviceName) {
- for (Map.Entry<Endpoint, PartnerLinkMyRoleImpl> e : getEndpointToMyRoleMap().entrySet()) {
+ assert _hydrationLatch.isLatched(1);
+
+ for (Map.Entry<Endpoint, PartnerLinkMyRoleImpl> e : _endpointToMyRoleMap.entrySet()) {
if (e.getKey().serviceName.equals(serviceName))
return e.getValue();
}
@@ -362,18 +359,7 @@
return null;
}
- /**
- * Get the element name for a given WSDL part. If the part is an <em>element</em> part, the name of that element is returned.
- * If the part is an XML schema typed part, then the name of the part is returned in the null namespace.
- *
- * @param part
- * WSDL {@link javax.wsdl.Part}
- * @return name of element containing said part
- */
- static QName getElementNameForPart(OMessageVarType.Part part) {
- return (part.type instanceof OElementVarType) ? ((OElementVarType) part.type).elementType : new QName(null, part.name);
- }
-
+
/**
* Process the message-exchange interceptors.
*
@@ -387,7 +373,7 @@
// for (MessageExchangeInterceptor i : _mexInterceptors)
// if (!mex.processInterceptor(i, mex, ictx, invoker))
// return false;
- // for (MessageExchangeInterceptor i : getEngine().getGlobalInterceptors())
+ // for (MessageExchangeInterceptor i : getEngine().getInterceptors())
// if (!mex.processInterceptor(i, mex, ictx, invoker))
// return false;
//
@@ -401,7 +387,7 @@
* @throws JobProcessorException
* @see org.apache.ode.bpel.engine.BpelProcess#handleWorkEvent(java.util.Map<java.lang.String,java.lang.Object>)
*/
- public void handleWorkEvent(final JobInfo jobInfo) throws JobProcessorException {
+ void handleWorkEvent(final JobInfo jobInfo) throws JobProcessorException {
assert !_contexts.isTransacted() : "work events must be received outside of a transaction";
markused();
@@ -445,7 +431,7 @@
* @param tx
* the transaction
*/
- private <T> Future<T> enqueueTransaction(final Callable<T> tx) {
+ <T> Future<T> enqueueTransaction(final Callable<T> tx) {
// We have to wrap our transaction to make sure that we are hydrated when the transaction runs.
return _server.enqueueTransaction(new ProcessCallable<T>(tx));
}
@@ -711,34 +697,63 @@
}
}
- MyRoleMessageExchangeImpl createMyRoleMex(MessageExchangeDAO mexdao) {
+ private MyRoleMessageExchangeImpl newMyRoleMex(
+ InvocationStyle istyle,
+ String mexId,
+ QName target,
+ OPartnerLink oplink,
+ Operation operation) {
+ MyRoleMessageExchangeImpl mex;
+ switch (istyle) {
+ case RELIABLE:
+ mex = new ReliableMyRoleMessageExchangeImpl(this, mexId, oplink,operation, target);
+ break;
+ case ASYNC:
+ mex = new AsyncMyRoleMessageExchangeImpl(this, mexId,oplink,operation,target);
+ break;
+ case TRANSACTED:
+ mex = new TransactedMyRoleMessageExchangeImpl(this, mexId, oplink,operation,target);
+ break;
+ case BLOCKING:
+ mex = new BlockingMyRoleMessageExchangeImpl(this, mexId, oplink,operation,target);
+ break;
+ default:
+ throw new AssertionError("Unexpected invocation style: " + istyle);
+
+ }
+
+ registerMyRoleMex(mex);
+ return mex;
+ }
+
+ MyRoleMessageExchangeImpl recreateMyRoleMex(MessageExchangeDAO mexdao) {
InvocationStyle istyle = InvocationStyle.valueOf(mexdao.getInvocationStyle());
_hydrationLatch.latch(1);
try {
- MyRoleMessageExchangeImpl mex;
- switch (istyle) {
- case RELIABLE:
- mex = new ReliableMyRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId());
- break;
- case ASYNC:
- mex = new AsyncMyRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId());
- break;
- case TRANSACTED:
- mex = new TransactedMyRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId());
- break;
- case BLOCKING:
- mex = new BlockingMyRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId());
- break;
- default:
- throw new AssertionError("Unexpected invocation style: " + istyle);
-
- }
OPartnerLink plink = (OPartnerLink) _oprocess.getChild(mexdao.getPartnerLinkModelId());
- PortType ptype = plink.myRolePortType;
- Operation op = plink.getMyRoleOperation(mexdao.getOperation());
+ if (plink == null) {
+ String errmsg = __msgs.msgDbConsistencyError("MexDao #"+ mexdao.getMessageExchangeId() + " referenced unknown pLinkModelId " + mexdao.getPartnerLinkModelId());
+ __log.error(errmsg);
+ throw new BpelEngineException(errmsg);
+ }
+
+ Operation op = plink.getMyRoleOperation(mexdao.getOperation());
+ if (op == null) {
+ String errmsg = __msgs.msgDbConsistencyError("MexDao #"+ mexdao.getMessageExchangeId() + " referenced unknown operation " + mexdao.getOperation());
+ __log.error(errmsg);
+ throw new BpelEngineException(errmsg);
+ }
+
+ PartnerLinkMyRoleImpl myRole = _myRoles.get(plink);
+ if (myRole == null) {
+ String errmsg = __msgs.msgDbConsistencyError("MexDao #"+ mexdao.getMessageExchangeId() + " referenced non-existant myrole");
+ __log.error(errmsg);
+ throw new BpelEngineException(errmsg);
+ }
+
+ MyRoleMessageExchangeImpl mex = newMyRoleMex(istyle, mexdao.getMessageExchangeId(), myRole._endpoint.serviceName, plink, op);
mex.load(mexdao);
- mex.init(ptype, op, MessageExchangePattern.valueOf(mexdao.getPattern()));
return mex;
} finally {
_hydrationLatch.release(1);
@@ -751,27 +766,26 @@
_hydrationLatch.latch(1);
try {
OPartnerLink plink = (OPartnerLink) _oprocess.getChild(mexdao.getPartnerLinkModelId());
- PortType ptype = plink.partnerRolePortType;
Operation op = plink.getPartnerRoleOperation(mexdao.getOperation());
switch (istyle) {
case BLOCKING:
- mex = new BlockingPartnerRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId(), ptype, op, null, /* EPR todo */
+ mex = new BlockingPartnerRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId(), plink, op, null, /* EPR todo */
plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
break;
case ASYNC:
- mex = new AsyncPartnerRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId(), ptype, op, null, /* EPR todo */
+ mex = new AsyncPartnerRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId(), plink, op, null, /* EPR todo */
plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
break;
case TRANSACTED:
- mex = new TransactedPartnerRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId(), ptype, op, null, /*
+ mex = new TransactedPartnerRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId(), plink, op, null, /*
* EPR
* todo
*/
plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
break;
case RELIABLE:
- mex = new ReliablePartnerRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId(), ptype, op, null, /* EPR todo */
+ mex = new ReliablePartnerRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId(), plink, op, null, /* EPR todo */
plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
break;
@@ -779,7 +793,7 @@
throw new BpelEngineException("Unexpected InvocationStyle: " + istyle);
}
-
+
mex.load(mexdao);
return mex;
} finally {
@@ -792,22 +806,33 @@
return _invocationStyles;
}
- private Map<Endpoint, PartnerLinkMyRoleImpl> getEndpointToMyRoleMap() {
- _hydrationLatch.latch(1);
- try {
- return _endpointToMyRoleMap;
- } finally {
- _hydrationLatch.release(1);
+ /**
+ * Find the partner-link-my-role that corresponds to the given service name.
+ *
+ * @param serviceName
+ * name of service
+ * @return corresponding {@link PartnerLinkMyRoleImpl}
+ */
+ private PartnerLinkMyRoleImpl getPartnerLinkForService(QName serviceName) {
+ assert _hydrationLatch.isLatched(1);
+
+ PartnerLinkMyRoleImpl target = null;
+ for (Endpoint endpoint : _endpointToMyRoleMap.keySet()) {
+ if (endpoint.serviceName.equals(serviceName))
+ target = _endpointToMyRoleMap.get(endpoint);
}
- }
- public ReplacementMap getReplacementMap() {
- _hydrationLatch.latch(1);
- try {
- return _replacementMap;
- } finally {
- _hydrationLatch.release(1);
- }
+ return target;
+
+ }
+
+ /**
+ * Used by {@link BpelRuntimeContextImpl} constructor. Should only be called from latched context.
+ * @return
+ */
+ ReplacementMap getReplacementMap() {
+ assert _hydrationLatch.isLatched(1);
+ return _replacementMap;
}
public boolean isInMemory() {
@@ -978,11 +1003,11 @@
}
- MessageExchangeDAO createMessageExchange(final char dir) {
+ MessageExchangeDAO createMessageExchange(String mexId, final char dir) {
if (isInMemory()) {
- return _inMemDao.getConnection().createMessageExchange(dir);
+ return _inMemDao.getConnection().createMessageExchange(mexId, dir);
} else {
- return _contexts.dao.getConnection().createMessageExchange(dir);
+ return _contexts.dao.getConnection().createMessageExchange(mexId, dir);
}
}
@@ -996,15 +1021,13 @@
*
* @param runnable
*/
- public void scheduleRunnable(final Runnable runnable) {
+ void scheduleRunnable(final Runnable runnable) {
if (__log.isDebugEnabled())
__log.debug("schedulingRunnable for process " + _pid + ": " + runnable);
_server.scheduleRunnable(new ProcessRunnable(runnable));
}
-
-
class ProcessRunnable implements Runnable {
Runnable _work;
@@ -1043,69 +1066,52 @@
}
- public MyRoleMessageExchange createNewMyRoleMex(final InvocationStyle istyle, final QName targetService, final String operation, final String clientKey) {
+ public MyRoleMessageExchange createNewMyRoleMex(final InvocationStyle istyle,
+ final QName targetService,
+ final String operation,
+ final String clientKey) {
+
+ final String mexId = new GUID().toString();
_hydrationLatch.latch(1);
try {
-
- final PartnerLinkMyRoleImpl target = getPartnerLinkForService(targetService);
- if (target == null)
- throw new BpelEngineException("NoSuchService: " + targetService);
- final Operation op = target._plinkDef.getMyRoleOperation(operation);
- if (op == null)
- throw new BpelEngineException("NoSuchOperation: " + operation);
-
- final MessageExchangePattern pattern = op.getOutput() == null ? MessageExchange.MessageExchangePattern.REQUEST_ONLY
- : MessageExchange.MessageExchangePattern.REQUEST_RESPONSE;
- Callable<MyRoleMessageExchange> createDao = new Callable<MyRoleMessageExchange>() {
+ final PartnerLinkMyRoleImpl target = getPartnerLinkForService(targetService);
+ if (target == null)
+ throw new BpelEngineException("NoSuchService: " + targetService);
+ final Operation op = target._plinkDef.getMyRoleOperation(operation);
+ if (op == null)
+ throw new BpelEngineException("NoSuchOperation: " + operation);
- public MyRoleMessageExchange call() throws Exception {
- MessageExchangeDAO dao = createMessageExchange(MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
- dao.setInvocationStyle(istyle.toString());
- dao.setCorrelationId(clientKey);
- dao.setCorrelationStatus(CorrelationStatus.UKNOWN_ENDPOINT.toString());
- dao.setPattern(pattern.toString());
- dao.setCallee(targetService);
- dao.setStatus(Status.NEW.toString());
- dao.setOperation(operation);
- dao.setPartnerLinkModelId(target._plinkDef.getId());
- dao.setTimeout(30 * 1000); // default timeout is 30 seconds, can be chaged by client.
- return createMyRoleMex(dao);
- }
-
- };
-
- try {
- if (isInMemory() || istyle == InvocationStyle.TRANSACTED || istyle == InvocationStyle.RELIABLE)
- return createDao.call();
- else
- return _contexts.execTransaction(createDao);
+ return newMyRoleMex(istyle, mexId, target._endpoint.serviceName, target._plinkDef, op);
- } catch (BpelEngineException be) {
- throw be;
- } catch (Exception e) {
- __log.error("Internal Error: could not create message exchange.", e);
- throw new BpelEngineException("Internal Error", e);
+ } finally {
+ _hydrationLatch.release(1);
}
-
- } finally {
- _hydrationLatch.release(1);
- }
}
- /**
- * Find the partner-link-my-role that corresponds to the given service name.
- * @param serviceName name of service
- * @return corresponding {@link PartnerLinkMyRoleImpl}
- */
- private PartnerLinkMyRoleImpl getPartnerLinkForService(QName serviceName) {
- PartnerLinkMyRoleImpl target = null;
- for (Endpoint endpoint : getEndpointToMyRoleMap().keySet()) {
- if (endpoint.serviceName.equals(serviceName))
- target = getEndpointToMyRoleMap().get(endpoint);
+ void registerMyRoleMex(MyRoleMessageExchangeImpl mymex) {
+ _mexStateListeners.add(new WeakReference<MyRoleMessageExchangeImpl>(mymex));
+ }
+
+ void unregisterMyRoleMex(MyRoleMessageExchangeImpl mymex) {
+ ArrayList<WeakReference<MyRoleMessageExchangeImpl>> needsRemoval = new ArrayList<WeakReference<MyRoleMessageExchangeImpl>>();
+ for (WeakReference<MyRoleMessageExchangeImpl> wref : _mexStateListeners) {
+ MyRoleMessageExchangeImpl mex = wref.get();
+ if (mex == null || mex == mymex)
+ needsRemoval.add(wref);
+ }
+ _mexStateListeners.removeAll(needsRemoval);
+
+ }
+
+ void fireMexStateEvent(MessageExchangeDAO mexdao) {
+ for (WeakReference<MyRoleMessageExchangeImpl> wr: _mexStateListeners) {
+ MyRoleMessageExchangeImpl mymex = wr.get();
+ if (mymex != null && mymex.getMessageExchangeId() != null)
+ mymex.onStateChanged(mexdao);
}
-
- return target;
}
+
+
}
Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?view=diff&rev=561873&r1=561872&r2=561873
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java Wed Aug 1 10:07:35 2007
@@ -135,7 +135,9 @@
private boolean _executed;
- public BpelRuntimeContextImpl(BpelInstanceWorker instanceWorker, ProcessInstanceDAO dao, PROCESS PROCESS,
+ public BpelRuntimeContextImpl(BpelInstanceWorker instanceWorker,
+ ProcessInstanceDAO dao,
+ PROCESS PROCESS,
MessageExchangeDAO instantiatingMessageExchange) {
_instanceWorker = instanceWorker;
_bpelProcess = instanceWorker._process;
@@ -558,7 +560,6 @@
scheduleAsyncResponse(myrolemex);
break;
default:
- // DO NOTHING
break;
}
@@ -693,7 +694,7 @@
evt.setPortType(partnerLink.partnerLink.partnerRolePortType.getQName());
evt.setAspect(ProcessMessageExchangeEvent.PARTNER_INPUT);
- MessageExchangeDAO mexDao = _dao.getConnection().createMessageExchange(MessageExchangeDAO.DIR_BPEL_INVOKES_PARTNERROLE);
+ MessageExchangeDAO mexDao = _dao.getConnection().createMessageExchange(new GUID().toString(),MessageExchangeDAO.DIR_BPEL_INVOKES_PARTNERROLE);
mexDao.setStatus(MessageExchange.Status.NEW.toString());
mexDao.setOperation(operation.getName());
mexDao.setPortType(partnerLink.partnerLink.partnerRolePortType.getQName());
@@ -779,22 +780,22 @@
if (supportedStyles.contains(InvocationStyle.RELIABLE)) {
// If RELIABLE is supported, this is easy, we just do it in-line.
ReliablePartnerRoleMessageExchangeImpl reliableMex = new ReliablePartnerRoleMessageExchangeImpl(_bpelProcess, mexDao
- .getMessageExchangeId(), portType, operation, partnerEpr, myRoleEpr, partnerRoleChannel);
+ .getMessageExchangeId(), partnerLink.partnerLink, operation, partnerEpr, myRoleEpr, partnerRoleChannel);
_contexts.mexContext.invokePartnerReliable(reliableMex);
} else if (supportedStyles.contains(InvocationStyle.TRANSACTED)) {
// If TRANSACTED is supported, this is again easy, do it in-line.
TransactedPartnerRoleMessageExchangeImpl transactedMex = new TransactedPartnerRoleMessageExchangeImpl(_bpelProcess,
- mexDao.getMessageExchangeId(), portType, operation, partnerEpr, myRoleEpr, partnerRoleChannel);
+ mexDao.getMessageExchangeId(), partnerLink.partnerLink, operation, partnerEpr, myRoleEpr, partnerRoleChannel);
_contexts.mexContext.invokePartnerTransacted(transactedMex);
} else if (supportedStyles.contains(InvocationStyle.BLOCKING)) {
// For BLOCKING invocation, we defer the call until after commit (unless idempotent).
BlockingPartnerRoleMessageExchangeImpl blockingMex = new BlockingPartnerRoleMessageExchangeImpl(_bpelProcess, mexDao
- .getMessageExchangeId(), portType, operation, partnerEpr, myRoleEpr, partnerRoleChannel);
+ .getMessageExchangeId(), partnerLink.partnerLink, operation, partnerEpr, myRoleEpr, partnerRoleChannel);
schedule(new BlockingInvoker(blockingMex));
} else if (supportedStyles.contains(InvocationStyle.ASYNC)) {
// For ASYNC style, we defer the call until after commit (unless idempotent).
AsyncPartnerRoleMessageExchangeImpl asyncMex = new AsyncPartnerRoleMessageExchangeImpl(_bpelProcess, mexDao
- .getMessageExchangeId(), portType, operation, partnerEpr, myRoleEpr, partnerRoleChannel);
+ .getMessageExchangeId(), partnerLink.partnerLink, operation, partnerEpr, myRoleEpr, partnerRoleChannel);
schedule(new AsyncInvoker(asyncMex));
} else {
@@ -866,7 +867,7 @@
if (__log.isDebugEnabled())
__log.debug("INVOKE PARTNER (SEP): sessionId=" + mySessionId + " partnerSessionId=" + partnerSessionId);
- MessageExchangeDAO myRoleMex = _bpelProcess.createMessageExchange(MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
+ MessageExchangeDAO myRoleMex = _bpelProcess.createMessageExchange(new GUID().toString(),MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
myRoleMex.setCallee(serviceName);
myRoleMex.setPipedMessageExchange(partnerRoleMex);
myRoleMex.setOperation(partnerRoleMex.getOperation());
@@ -1393,7 +1394,7 @@
assert !_bpelProcess.isInMemory() : "Internal error; attempt to schedule in-memory process";
assert _contexts.isTransacted();
- final MyRoleMessageExchangeImpl mex = _bpelProcess.createMyRoleMex(mexdao);
+ final MyRoleMessageExchangeImpl mex = _bpelProcess.recreateMyRoleMex(mexdao);
_pendingMyRoleReplies.add(mex);
}
Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java?view=diff&rev=561873&r1=561872&r2=561873
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java Wed Aug 1 10:07:35 2007
@@ -18,7 +18,6 @@
*/
package org.apache.ode.bpel.engine;
-import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
@@ -57,9 +56,6 @@
import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
import org.apache.ode.bpel.iapi.ProcessConf;
import org.apache.ode.bpel.iapi.Scheduler;
-import org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern;
-import org.apache.ode.bpel.iapi.MessageExchange.Status;
-import org.apache.ode.bpel.iapi.MyRoleMessageExchange.CorrelationStatus;
import org.apache.ode.bpel.iapi.Scheduler.JobInfo;
import org.apache.ode.bpel.iapi.Scheduler.JobProcessorException;
import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
@@ -89,8 +85,6 @@
private static final Messages __msgs = MessageBundle.getMessages(Messages.class);
- private final List<WeakReference<MessageExchangeStateListener>> _mexStateListeners = new ArrayList<WeakReference<MessageExchangeStateListener>>();
-
/** Maximum age of a process before it is quiesced */
private static Long __processMaxAge;
@@ -525,7 +519,7 @@
case MessageExchangeDAO.DIR_BPEL_INVOKES_PARTNERROLE:
return process.createPartnerRoleMex(mexdao);
case MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE:
- return process.createMyRoleMex(mexdao);
+ return process.recreateMyRoleMex(mexdao);
default:
String errmsg = "BpelEngineImpl: internal error, invalid MexDAO direction: " + mexId;
__log.fatal(errmsg);
@@ -584,11 +578,6 @@
return null;
}
- void registerMessageExchangeStateListener(MessageExchangeStateListener mexStateListener) {
- WeakReference<MessageExchangeStateListener> ref = new WeakReference<MessageExchangeStateListener>(mexStateListener);
-
- }
-
OProcess getOProcess(QName processId) {
_mngmtLock.readLock().lock();
try {
Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java?view=diff&rev=561873&r1=561872&r2=561873
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java Wed Aug 1 10:07:35 2007
@@ -37,6 +37,7 @@
import org.apache.ode.bpel.iapi.InvocationStyle;
import org.apache.ode.bpel.iapi.Message;
import org.apache.ode.bpel.iapi.MessageExchange;
+import org.apache.ode.bpel.o.OPartnerLink;
import org.apache.ode.utils.msg.MessageBundle;
import org.w3c.dom.Element;
@@ -73,32 +74,29 @@
final BpelProcess _process;
+ final OPartnerLink _oplink;
+
+ /** Message-exchange id. */
final String _mexId;
- /** Instance identifier. */
- Long _iid;
+ final PortType _portType;
- PortType _portType;
+ final Operation _operation;
- Operation _operation;
+ /** Instance identifier. */
+ Long _iid;
EndpointReference _epr;
- MessageExchangePattern _pattern;
-
- String _opname;
-
MessageImpl _request;
- boolean _associated;
-
/** The point at which this message-exchange will time out. */
- long _timeout;
+ long _timeout = 30 * 1000;
//
// The following fields need to be volatile, since a random IL thread may set them.
//
- volatile Status _status;
+ private volatile Status _status = Status.NEW;
volatile QName _fault;
@@ -132,23 +130,30 @@
private Set<String> _propNames;
- public MessageExchangeImpl(BpelProcess process, String mexId) {
+
+ public MessageExchangeImpl(
+ BpelProcess process,
+ String mexId,
+ OPartnerLink oplink,
+ PortType ptype,
+ Operation operation) {
_process = process;
- _contexts = process._contexts;
+ _contexts = process._contexts;
_mexId = mexId;
+ _oplink = oplink;
+ _portType = ptype;
+ _operation = operation;
}
@Override
public boolean equals(Object other) {
return _mexId.equals(((MessageExchangeImpl)other)._mexId);
}
+
void load(MessageExchangeDAO dao) {
- if (!dao.getMessageExchangeId().equals(_mexId))
- throw new IllegalArgumentException("MessageExchangeId mismatch!");
- _pattern = MessageExchangePattern.valueOf(dao.getPattern());
- _opname = dao.getOperation();
_timeout = dao.getTimeout();
+ _iid = dao.getInstance() != null ? dao.getInstance().getInstanceId() : null;
if (_fault == null)
_fault = dao.getFault();
@@ -159,13 +164,21 @@
}
public void save(MessageExchangeDAO dao) {
+ dao.setPartnerLinkModelId(_oplink.getId());
+ dao.setOperation(_operation.getName());
dao.setStatus(_status.toString());
dao.setInvocationStyle(getInvocationStyle().toString());
dao.setFault(_fault);
dao.setFaultExplanation(_explanation);
dao.setTimeout(_timeout);
- // todo: set failureType
+ dao.setFailureType(_failureType == null ? null : _failureType.toString());
+
+ if (_changes.contains(Change.REQUEST)) {
+ MessageDAO requestDao = dao.createMessage(_request.getType());
+ requestDao.setData(_request.getMessage());
+ }
+
if (_changes.contains(Change.RESPONSE)) {
MessageDAO responseDao = dao.createMessage(_response.getType());
responseDao.setData(_response.getMessage());
@@ -212,11 +225,11 @@
}
public String getOperationName() throws BpelEngineException {
- return _opname;
+ return getOperation().getName();
}
public MessageExchangePattern getMessageExchangePattern() {
- return _pattern;
+ return _operation.getOutput()==null ? MessageExchangePattern.REQUEST_ONLY : MessageExchangePattern.REQUEST_RESPONSE;
}
public boolean isTransactional() throws BpelEngineException {
@@ -283,14 +296,7 @@
});
}
- void init(PortType portType, Operation operation, MessageExchangePattern pattern) {
- if (__log.isTraceEnabled())
- __log.trace("Mex[" + getMessageExchangeId() + "].setPortOp(" + portType + "," + operation + ")");
- _portType = portType;
- _operation = operation;
- _pattern = pattern;
- }
-
+
void setFault(QName faultType, Message outputFaultMessage) throws BpelEngineException {
setStatus(Status.FAULT);
_fault = faultType;
@@ -409,7 +415,7 @@
return action.call(getDAO());
} else {
try {
- return _process._server.enqueueTransaction(new Callable<T>() {
+ return _process.enqueueTransaction(new Callable<T>() {
public T call() throws Exception {
assertTransaction();
return action.call(getDAO());
Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Messages.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Messages.java?view=diff&rev=561873&r1=561872&r2=561873
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Messages.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Messages.java Wed Aug 1 10:07:35 2007
@@ -190,4 +190,8 @@
return format("Scheduled job failed; jobDetail={0}", jobDetail);
}
+ public String msgDbConsistencyError(String detail) {
+ return format("Database consistency error: {0}", detail);
+ }
+
}
Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java?view=diff&rev=561873&r1=561872&r2=561873
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java Wed Aug 1 10:07:35 2007
@@ -1,8 +1,10 @@
package org.apache.ode.bpel.engine;
+import java.lang.ref.WeakReference;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
+import javax.wsdl.Operation;
import javax.xml.namespace.QName;
import org.apache.commons.logging.Log;
@@ -17,21 +19,27 @@
import org.apache.ode.bpel.intercept.InterceptorInvoker;
import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
import org.apache.ode.bpel.intercept.MessageExchangeInterceptor.InterceptorContext;
+import org.apache.ode.bpel.o.OPartnerLink;
abstract class MyRoleMessageExchangeImpl extends MessageExchangeImpl implements MyRoleMessageExchange {
private static final Log __log = LogFactory.getLog(MyRoleMessageExchangeImpl.class);
+ protected final QName _callee;
+
protected CorrelationStatus _cstatus;
protected String _clientId;
- protected QName _callee;
-
- public MyRoleMessageExchangeImpl(BpelProcess process, String mexId) {
- super(process, mexId);
+ public MyRoleMessageExchangeImpl(BpelProcess process,
+ String mexId,
+ OPartnerLink oplink,
+ Operation operation,
+ QName callee) {
+ super(process, mexId, oplink, oplink.myRolePortType, operation);
+ _callee = callee;
}
-
+
public CorrelationStatus getCorrelationStatus() {
return _cstatus;
}
@@ -41,7 +49,6 @@
super.load(dao);
_cstatus = CorrelationStatus.valueOf(dao.getCorrelationStatus());
_clientId = dao.getCorrelationId();
- _callee = dao.getCallee();
}
@Override
@@ -83,7 +90,7 @@
public String toString() {
try {
- return "{MyRoleMex#" + _mexId + " [Client " + _clientId + "] calling " + _callee + "." + _opname + "(...)}";
+ return "{MyRoleMex#" + _mexId + " [Client " + _clientId + "] calling " + _callee + "." + getOperationName() + "(...)}";
} catch (Throwable t) {
return "{MyRoleMex#???}";
}
@@ -94,7 +101,7 @@
}
- protected void scheduleInvoke(BpelProcess target) {
+ protected void scheduleInvoke() {
assert !_process.isInMemory() : "Cannot schedule invokes for in-memory processes.";
assert _contexts.isTransacted() : "Cannot schedule outside of transaction context.";
@@ -102,13 +109,13 @@
// Schedule a new job for invocation
final WorkEvent we = new WorkEvent();
we.setType(WorkEvent.Type.MYROLE_INVOKE);
- we.setProcessId(target.getPID());
+ we.setProcessId(_process.getPID());
we.setMexId(_mexId);
// Schedule a timeout
final WorkEvent we1 = new WorkEvent();
we1.setType(WorkEvent.Type.MYROLE_INVOKE_TIMEOUT);
- we1.setProcessId(target.getPID());
+ we1.setProcessId(_process.getPID());
we1.setMexId(_mexId);
setStatus(Status.ASYNC);
@@ -155,11 +162,13 @@
}
- /**
- * Callback.
- *
- * @param mexdao
- */
- protected void onMessageExchangeComplete(MessageExchangeDAO mexdao) {
+ protected void onStateChanged(MessageExchangeDAO mexdao) {
+ setStatus(Status.valueOf(mexdao.getStatus()));
}
+
+
+ protected void finalize() {
+ _process.unregisterMyRoleMex(this);
+ }
+
}
Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/NStateLatch.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/NStateLatch.java?view=diff&rev=561873&r1=561872&r2=561873
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/NStateLatch.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/NStateLatch.java Wed Aug 1 10:07:35 2007
@@ -107,4 +107,8 @@
_lock.unlock();
}
}
+
+ public boolean isLatched(int state) {
+ return _state == state && _depth > 0;
+ }
}
Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java?view=diff&rev=561873&r1=561872&r2=561873
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java Wed Aug 1 10:07:35 2007
@@ -20,7 +20,6 @@
package org.apache.ode.bpel.engine;
import javax.wsdl.Operation;
-import javax.wsdl.PortType;
import javax.xml.namespace.QName;
import org.apache.commons.logging.Log;
@@ -33,6 +32,7 @@
import org.apache.ode.bpel.iapi.MessageExchangeContext;
import org.apache.ode.bpel.iapi.PartnerRoleChannel;
import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
+import org.apache.ode.bpel.o.OPartnerLink;
import org.w3c.dom.Element;
/**
@@ -64,14 +64,11 @@
volatile boolean _blocked = false;
- PartnerRoleMessageExchangeImpl(BpelProcess process, String mexId, PortType portType, Operation operation,
+ PartnerRoleMessageExchangeImpl(BpelProcess process, String mexId, OPartnerLink oplink, Operation operation,
EndpointReference epr, EndpointReference myRoleEPR, PartnerRoleChannel channel) {
- super(process, mexId);
+ super(process, mexId, oplink, oplink.partnerRolePortType, operation);
_myRoleEPR = myRoleEPR;
_partnerRoleChannel = channel;
-
- init(portType, operation, (operation.getOutput() == null) ? MessageExchangePattern.REQUEST_ONLY
- : MessageExchangePattern.REQUEST_RESPONSE);
}
@Override
@@ -150,7 +147,7 @@
public String toString() {
try {
- return "{PartnerRoleMex#" + _mexId + " [PID " + getCaller() + "] calling " + _epr + "." + _opname + "(...)}";
+ return "{PartnerRoleMex#" + _mexId + " [PID " + getCaller() + "] calling " + _epr + "." + getOperationName() + "(...)}";
} catch (Throwable t) {
return "{PartnerRoleMex#????}";
@@ -184,8 +181,8 @@
protected void checkReplyContextOk() {
// Prevent duplicate replies.
- if (_status != MessageExchange.Status.REQUEST && _status != MessageExchange.Status.ASYNC)
- throw new BpelEngineException("Invalid message exchange state, expect REQUEST or ASYNC, but got " + _status);
+ if (getStatus() != MessageExchange.Status.REQUEST && getStatus() != MessageExchange.Status.ASYNC)
+ throw new BpelEngineException("Invalid message exchange state, expect REQUEST or ASYNC, but got " + getStatus());
// In-memory processe are special, they don't allow scheduling so any replies must be delivered immediately.
if (!_blocked && _process.isInMemory())
Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java?view=diff&rev=561873&r1=561872&r2=561873
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java Wed Aug 1 10:07:35 2007
@@ -35,7 +35,9 @@
import org.apache.ode.bpel.intercept.InterceptorInvoker;
import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
import org.apache.ode.bpel.intercept.MessageExchangeInterceptor.InterceptorContext;
+import org.apache.ode.bpel.o.OPartnerLink;
+import javax.wsdl.Operation;
import javax.xml.namespace.QName;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -57,21 +59,20 @@
public static final int TIMEOUT = 2 * 60 * 1000;
- public ReliableMyRoleMessageExchangeImpl(BpelProcess process, String mexId) {
- super(process, mexId);
+ public ReliableMyRoleMessageExchangeImpl(BpelProcess process, String mexId, OPartnerLink oplink, Operation operation, QName callee) {
+ super(process, mexId, oplink, operation, callee);
}
-
public void invokeReliable() {
// For reliable, we MUST HAVE A TRANSACTION!
assertTransaction();
// Cover the case where invoke was already called.
- if (_status == Status.REQUEST)
+ if (getStatus() == Status.REQUEST)
return;
- if (_status != Status.NEW)
- throw new BpelEngineException("Invalid state: " + _status);
+ if (getStatus() != Status.NEW)
+ throw new BpelEngineException("Invalid state: " + getStatus());
if (!processInterceptors(InterceptorInvoker.__onBpelServerInvoked, getDAO())) {
throw new BpelEngineException("Intercepted.");
@@ -81,7 +82,7 @@
__log.debug("invoke() EPR= " + _epr + " ==> " + _process);
setStatus(Status.REQUEST);
save(getDAO());
- scheduleInvoke(_process);
+ scheduleInvoke();
}
Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java?view=diff&rev=561873&r1=561872&r2=561873
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java Wed Aug 1 10:07:35 2007
@@ -1,7 +1,6 @@
package org.apache.ode.bpel.engine;
import javax.wsdl.Operation;
-import javax.wsdl.PortType;
import org.apache.ode.bpel.dao.MessageExchangeDAO;
import org.apache.ode.bpel.engine.MessageExchangeImpl.InDbAction;
@@ -10,11 +9,12 @@
import org.apache.ode.bpel.iapi.InvocationStyle;
import org.apache.ode.bpel.iapi.PartnerRoleChannel;
import org.apache.ode.bpel.iapi.MessageExchange.Status;
+import org.apache.ode.bpel.o.OPartnerLink;
public class ReliablePartnerRoleMessageExchangeImpl extends PartnerRoleMessageExchangeImpl {
- public ReliablePartnerRoleMessageExchangeImpl(BpelProcess process, String mexId, PortType ptype, Operation op, EndpointReference epr, EndpointReference myRoleEPR, PartnerRoleChannel partnerRoleChannel) {
- super(process, mexId, ptype, op, epr, myRoleEPR, partnerRoleChannel);
+ public ReliablePartnerRoleMessageExchangeImpl(BpelProcess process, String mexId, OPartnerLink oplink, Operation op, EndpointReference epr, EndpointReference myRoleEPR, PartnerRoleChannel partnerRoleChannel) {
+ super(process, mexId, oplink, op, epr, myRoleEPR, partnerRoleChannel);
}
Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java?view=diff&rev=561873&r1=561872&r2=561873
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java Wed Aug 1 10:07:35 2007
@@ -1,9 +1,13 @@
package org.apache.ode.bpel.engine;
+import javax.wsdl.Operation;
+import javax.xml.namespace.QName;
+
import org.apache.ode.bpel.iapi.BpelEngineException;
import org.apache.ode.bpel.iapi.InvocationStyle;
import org.apache.ode.bpel.iapi.MessageExchange;
import org.apache.ode.bpel.iapi.MessageExchange.Status;
+import org.apache.ode.bpel.o.OPartnerLink;
/**
* Transacted my-role message exchange.
@@ -15,8 +19,8 @@
*/
public class TransactedMyRoleMessageExchangeImpl extends MyRoleMessageExchangeImpl {
- public TransactedMyRoleMessageExchangeImpl(BpelProcess target, String mexId) {
- super(target, mexId);
+ public TransactedMyRoleMessageExchangeImpl(BpelProcess process, String mexId, OPartnerLink oplink, Operation operation, QName callee) {
+ super(process, mexId, oplink, operation, callee);
}
@Override
Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java?view=diff&rev=561873&r1=561872&r2=561873
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java Wed Aug 1 10:07:35 2007
@@ -181,10 +181,9 @@
throw new UnsupportedOperationException("Can't query process configuration using a transient DAO.");
}
- public MessageExchangeDAO createMessageExchange(char dir) {
- String id = Long.toString(counter.getAndIncrement());
- MessageExchangeDAO mex = new MessageExchangeDAOImpl(dir, id);
- _mexStore.put(id, mex);
+ public MessageExchangeDAO createMessageExchange(String mexId, char dir) {
+ MessageExchangeDAO mex = new MessageExchangeDAOImpl(dir, mexId);
+ _mexStore.put(mexId, mex);
return mex;
}
Modified: ode/branches/bart/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java?view=diff&rev=561873&r1=561872&r2=561873
==============================================================================
--- ode/branches/bart/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java (original)
+++ ode/branches/bart/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java Wed Aug 1 10:07:35 2007
@@ -342,4 +342,9 @@
}
}
+ public void waitForBlocking() {
+ // TODO Auto-generated method stub
+
+ }
+
}