You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by ms...@apache.org on 2007/07/24 22:58:14 UTC
svn commit: r559204 [2/3] - in /incubator/ode/branches/bart: ./
axis2/src/main/java/org/apache/ode/axis2/
bpel-api/src/main/java/org/apache/ode/bpel/iapi/
bpel-epr/src/main/java/org/apache/ode/il/
bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ ...
Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?view=diff&rev=559204&r1=559203&r2=559204
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java Tue Jul 24 13:58:12 2007
@@ -18,6 +18,19 @@
*/
package org.apache.ode.bpel.engine;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import javax.wsdl.Operation;
+import javax.wsdl.PortType;
+import javax.xml.namespace.QName;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ode.bpel.common.CorrelationKey;
@@ -44,21 +57,17 @@
import org.apache.ode.bpel.iapi.Endpoint;
import org.apache.ode.bpel.iapi.EndpointReference;
import org.apache.ode.bpel.iapi.InvocationStyle;
-import org.apache.ode.bpel.iapi.Message;
import org.apache.ode.bpel.iapi.MessageExchange;
-import org.apache.ode.bpel.iapi.MessageExchangeContext;
import org.apache.ode.bpel.iapi.PartnerRoleChannel;
import org.apache.ode.bpel.iapi.MessageExchange.FailureType;
import org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern;
import org.apache.ode.bpel.iapi.MessageExchange.Status;
-import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
-import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
import org.apache.ode.bpel.memdao.ProcessInstanceDaoImpl;
import org.apache.ode.bpel.o.OMessageVarType;
-import org.apache.ode.bpel.o.OMessageVarType.Part;
import org.apache.ode.bpel.o.OPartnerLink;
import org.apache.ode.bpel.o.OProcess;
import org.apache.ode.bpel.o.OScope;
+import org.apache.ode.bpel.o.OMessageVarType.Part;
import org.apache.ode.bpel.runtime.BpelJacobRunnable;
import org.apache.ode.bpel.runtime.BpelRuntimeContext;
import org.apache.ode.bpel.runtime.CorrelationSetInstance;
@@ -80,21 +89,9 @@
import org.apache.ode.utils.GUID;
import org.apache.ode.utils.Namespaces;
import org.apache.ode.utils.ObjectPrinter;
-import org.omg.CORBA._PolicyStub;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
-import javax.wsdl.Operation;
-import javax.xml.namespace.QName;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Date;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-
/**
*
*
@@ -118,15 +115,17 @@
private MessageExchangeDAO _instantiatingMessageExchange;
- /** Object for keeping track of all the outstanding <invoke>s. */
+ /** Object for keeping track of all the outstanding <pick>/<receive> activities */
private OutstandingRequestManager _outstandingRequests;
- /** List of BLOCKING invocations that need to be deferred until the end of the current TX */
- private List<PartnerRoleMessageExchange> _todoBlockingCalls = new LinkedList<PartnerRoleMessageExchange>();
+ /** List of pending invocations that need to be deferred until the end of the current TX */
+ private List<PartnerRoleMessageExchangeImpl> _pendingPartnerRoleInvokes = new LinkedList<PartnerRoleMessageExchangeImpl>();
- /** List of ASYNC invocations that need to be deferred until the end of the current TX. */
- private List<PartnerRoleMessageExchange> _todoAsyncCalls = new LinkedList<PartnerRoleMessageExchange>();
+ /** List of pending ASYNC responses that need to be deferred until the end of the current TX. */
+ private List<MyRoleMessageExchangeImpl> _pendingMyRoleReplies = new LinkedList<MyRoleMessageExchangeImpl>();
+ private BpelInstanceWorker _instanceWorker;
+
private BpelProcess _bpelProcess;
/** Five second maximum for continous execution. */
@@ -134,10 +133,13 @@
private Contexts _contexts;
- public BpelRuntimeContextImpl(BpelProcess bpelProcess, ProcessInstanceDAO dao, PROCESS PROCESS,
+ private boolean _executed;
+
+ public BpelRuntimeContextImpl(BpelInstanceWorker instanceWorker, ProcessInstanceDAO dao, PROCESS PROCESS,
MessageExchangeDAO instantiatingMessageExchange) {
- _bpelProcess = bpelProcess;
- _contexts = bpelProcess._contexts;
+ _instanceWorker = instanceWorker;
+ _bpelProcess = instanceWorker._process;
+ _contexts = instanceWorker._contexts;
_dao = dao;
_iid = dao.getInstanceId();
_instantiatingMessageExchange = instantiatingMessageExchange;
@@ -149,24 +151,16 @@
_outstandingRequests = new OutstandingRequestManager();
_vpu.setContext(_soup);
- if (bpelProcess.isInMemory()) {
- ProcessInstanceDaoImpl inmem = (ProcessInstanceDaoImpl) _dao;
- if (inmem.getSoup() != null) {
- _soup = (ExecutionQueueImpl) inmem.getSoup();
- _outstandingRequests = (OutstandingRequestManager) _soup.getGlobalData();
- _vpu.setContext(_soup);
- }
- } else {
- byte[] daoState = dao.getExecutionState();
- if (daoState != null) {
- ByteArrayInputStream iis = new ByteArrayInputStream(daoState);
- try {
- _soup.read(iis);
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- _outstandingRequests = (OutstandingRequestManager) _soup.getGlobalData();
+ byte[] daoState = dao.getExecutionState();
+ if (daoState != null) {
+ assert !_bpelProcess.isInMemory() : "did not expect to rehydrate in-mem process!";
+ ByteArrayInputStream iis = new ByteArrayInputStream(daoState);
+ try {
+ _soup.read(iis);
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
}
+ _outstandingRequests = (OutstandingRequestManager) _soup.getGlobalData();
}
if (PROCESS != null) {
@@ -232,7 +226,7 @@
sendEvent(new ProcessCompletionEvent(faultData.getFaultName()));
_dao.finishCompletion();
- faultOutstandingMessageExchanges(faultData);
+ cleanupOutstandingMyRoleExchanges(faultData);
}
/**
@@ -253,7 +247,7 @@
sendEvent(new ProcessCompletionEvent(null));
_dao.finishCompletion();
- completeOutstandingMessageExchanges();
+ cleanupOutstandingMyRoleExchanges();
}
/**
@@ -313,7 +307,7 @@
evt.setNewState(ProcessState.STATE_READY);
sendEvent(evt);
} else if (_bpelProcess.isInMemory()) {
- // This condition should be detected with static analysis, but just in case.
+ // This condition should be detected with static analysis, but just in case.
throw new InvalidProcessException("In-memory process must not receive additional messages.");
}
@@ -558,11 +552,11 @@
InvocationStyle istyle = InvocationStyle.valueOf(myrolemex.getInvocationStyle());
switch (istyle) {
case RELIABLE:
- scheduleReliableResponse(myrolemex.getMessageExchangeId());
- break;
+ scheduleReliableResponse(myrolemex);
+ break;
case ASYNC:
- scheduleAsyncResponse(myrolemex.getMessageExchangeId());
- break;
+ scheduleAsyncResponse(myrolemex);
+ break;
default:
// DO NOTHING
break;
@@ -572,6 +566,7 @@
sendEvent(evt);
}
+
/**
* @see BpelRuntimeContext#writeCorrelation(org.apache.ode.bpel.runtime.CorrelationSetInstance,
* org.apache.ode.bpel.common.CorrelationKey)
@@ -646,7 +641,7 @@
sendEvent(evt);
sendEvent(new ProcessTerminationEvent());
- failOutstandingMessageExchanges();
+ cleanupOutstandingMyRoleExchanges();
}
public void registerTimer(TimerResponseChannel timerChannel, Date timeToFire) {
@@ -663,7 +658,7 @@
we.setType(WorkEvent.Type.MATCHER);
we.setCorrelatorId(correlatorId);
we.setCorrelationKey(key);
- _contexts.scheduler.scheduleVolatileJob(true, we.getDetail());
+ _contexts.scheduler.schedulePersistedJob(we.getDetail(), null);
}
public String invoke(PartnerLinkInstance partnerLink, Operation operation, Element outgoingMessage,
@@ -765,46 +760,59 @@
*/
private void invokeIL(PartnerLinkInstance partnerLink, Operation operation, Element outgoingMessage,
PartnerRoleChannel partnerRoleChannel, EndpointReference partnerEpr, MessageExchangeDAO mexDao) {
- if (partnerEpr != null) {
- // If we couldn't find the endpoint, then there is no sense
- // in asking the IL to invoke.
- mexDao.setEPR(partnerEpr.toXML().getDocumentElement());
- mexDao.setStatus(MessageExchange.Status.REQUEST.toString());
- Set<InvocationStyle> supportedStyles = _contexts.mexContext.getSupportedInvocationStyle(partnerRoleChannel, partnerEpr);
- if (supportedStyles.contains(InvocationStyle.RELIABLE)) {
- // If RELIABLE is supported, this is easy, we just do it in-line.
- throw new UnsupportedOperationException(); // TODO
- ReliablePartnerRoleMessageExchangeImpl reliableMex = new ReliablePartnerRoleMessageExchangeImpl();
- _contexts.mexContext.invokePartnerReliable(reliableMex);
- } else if (supportedStyles.contains(InvocationStyle.TRANSACTED)) {
- // If TRANSACTED is supported, this is again easy, do it in-line.
- throw new UnsupportedOperationException(); // TODO
- TransactedPartnerRoleMessageExchangeImpl transactedMex = new TransactedPartnerRoleMessageExchangeImpl();
- _contexts.mexContext.invokePartnerTransacted(transactedMex);
- } else if (supportedStyles.contains(InvocationStyle.BLOCKING)) {
- // For BLOCKING invocation, we defer the call until after commit (unless idempotent).
- BlockingPartnerRoleMessageExchangeImpl blockingMex = new BlockingPartnerRoleMessageExchangeImpl();
- _todoBlockingCalls.add(blockingMex);
- } else if (supportedStyles.contains(InvocationStyle.ASYNC)) {
- // For ASYNC style, we defer the call until after commit (unless idempotent).
- AsyncPartnerRoleMessageExchangeImpl asyncMex = new AsyncPartnerRoleMessageExchangeImpl();
- _todoAsyncCalls.add(asyncMex);
- } else {
- // This really should not happen, indicates IL is screwy.
- __log.error("Integration Layer did not agree to any known invocation style for EPR "
- + DOMUtils.domToString(partnerEPR));
- mexDao.setFailureType(FailureType.COMMUNICATION_ERROR.toString());
- mexDao.setStatus(Status.FAILURE.toString());
- mexDao.setFaultExplanation("NoMatchingStyle");
- }
-
- } else {
- __log.error("Couldn't find endpoint for partner EPR " + DOMUtils.domToString(partnerEPR));
+ // If we couldn't find the endpoint, then there is no sense
+ // in asking the IL to invoke.
+ if (partnerEpr == null) {
+ __log.error("Couldn't find endpoint for partner EPR ");
mexDao.setFailureType(FailureType.UNKNOWN_ENDPOINT.toString());
mexDao.setFaultExplanation("UnknownEndpoint");
mexDao.setStatus(Status.FAILURE.toString());
- // , partnerEPR);
+ return;
}
+
+ PortType portType = partnerLink.partnerLink.partnerRolePortType;
+ EndpointReference myRoleEpr = null; // TODO: how did we get this?
+
+ mexDao.setEPR(partnerEpr.toXML().getDocumentElement());
+ mexDao.setStatus(MessageExchange.Status.REQUEST.toString());
+ Set<InvocationStyle> supportedStyles = _contexts.mexContext.getSupportedInvocationStyle(partnerRoleChannel, partnerEpr);
+ if (supportedStyles.contains(InvocationStyle.RELIABLE)) {
+ // If RELIABLE is supported, this is easy, we just do it in-line.
+ ReliablePartnerRoleMessageExchangeImpl reliableMex = new ReliablePartnerRoleMessageExchangeImpl(_bpelProcess, mexDao
+ .getMessageExchangeId(), portType, operation, partnerEpr, myRoleEpr, partnerRoleChannel);
+ _contexts.mexContext.invokePartnerReliable(reliableMex);
+ } else if (supportedStyles.contains(InvocationStyle.TRANSACTED)) {
+ // If TRANSACTED is supported, this is again easy, do it in-line.
+ TransactedPartnerRoleMessageExchangeImpl transactedMex = new TransactedPartnerRoleMessageExchangeImpl(_bpelProcess,
+ mexDao.getMessageExchangeId(), portType, operation, partnerEpr, myRoleEpr, partnerRoleChannel);
+ _contexts.mexContext.invokePartnerTransacted(transactedMex);
+ } else if (supportedStyles.contains(InvocationStyle.BLOCKING)) {
+ // For BLOCKING invocation, we defer the call until after commit (unless idempotent).
+ BlockingPartnerRoleMessageExchangeImpl blockingMex = new BlockingPartnerRoleMessageExchangeImpl(_bpelProcess, mexDao
+ .getMessageExchangeId(), portType, operation, partnerEpr, myRoleEpr, partnerRoleChannel);
+ schedule(new BlockingInvoker(blockingMex));
+ } else if (supportedStyles.contains(InvocationStyle.ASYNC)) {
+ // For ASYNC style, we defer the call until after commit (unless idempotent).
+ AsyncPartnerRoleMessageExchangeImpl asyncMex = new AsyncPartnerRoleMessageExchangeImpl(_bpelProcess, mexDao
+ .getMessageExchangeId(), portType, operation, partnerEpr, myRoleEpr, partnerRoleChannel);
+ schedule(new AsyncInvoker(asyncMex));
+
+ } else {
+ // This really should not happen, indicates IL is screwy.
+ __log.error("Integration Layer did not agree to any known invocation style for EPR " + partnerEpr);
+ mexDao.setFailureType(FailureType.COMMUNICATION_ERROR.toString());
+ mexDao.setStatus(Status.FAILURE.toString());
+ mexDao.setFaultExplanation("NoMatchingStyle");
+ }
+
+ }
+
+ private void schedule(final Runnable runnable) {
+ _contexts.registerCommitSynchronizer(new Runnable() {
+ public void run() {
+ _instanceWorker.enqueue(runnable);
+ }
+ });
}
/**
@@ -821,6 +829,31 @@
__log.debug("Invoking in a p2p interaction, partnerrole " + partnerRoleMex.getMessageExchangeId());
}
+ // following code is used to determine the style of invocation. note that for p2p, this is a bit of an
+ // "approximation", since we are using non-public mechanisms to control the child process. In any case
+ // the "in-memory" status of the caller and callee play an important role in determining the style.
+ InvocationStyle style;
+ if (_bpelProcess.isInMemory()) {
+ if (operation.getOutput() == null)
+ style = InvocationStyle.RELIABLE;
+ else if (target.isInMemory())
+ style = InvocationStyle.TRANSACTED;
+ else if (target.getSupportedInvocationStyle(serviceName).contains(InvocationStyle.TRANSACTED))
+ style = InvocationStyle.TRANSACTED;
+ else
+ style = InvocationStyle.BLOCKING;
+ } else /* persisted */{
+
+ if (operation.getOutput() != null
+ && target.getSupportedInvocationStyle(serviceName).contains(InvocationStyle.TRANSACTED))
+ style = InvocationStyle.TRANSACTED;
+ else
+ style = InvocationStyle.RELIABLE;
+
+ }
+
+ partnerRoleMex.setInvocationStyle(style.toString());
+
// Properties used by stateful-exchange protocol.
String mySessionId = partnerRoleMex.getPartnerLink().getMySessionId();
String partnerSessionId = partnerRoleMex.getPartnerLink().getPartnerSessionId();
@@ -840,6 +873,7 @@
myRoleMex.setPattern(partnerRoleMex.getPattern());
myRoleMex.setTimeout(partnerRoleMex.getTimeout());
myRoleMex.setRequest(partnerRoleMex.getRequest());
+ myRoleMex.setInvocationStyle(style.toString());
if (BpelProcess.__log.isDebugEnabled()) {
__log.debug("Setting myRoleMex session ids for p2p interaction, mySession " + partnerSessionId + " - partnerSess "
@@ -857,7 +891,13 @@
}
void execute() {
+ if (!_contexts.isTransacted())
+ throw new BpelEngineException("MUST RUN IN TRANSACTION!");
+ if (_executed)
+ throw new IllegalStateException("cannot call execute() twice!");
+
long maxTime = System.currentTimeMillis() + _maxReductionTimeMs;
+
// Execute the process state reductions
boolean canReduce = true;
@@ -889,12 +929,12 @@
// Max time exceeded (possibly an infinite loop).
if (__log.isDebugEnabled())
__log.debug("MaxTime exceeded for instance # " + _iid);
-
- // NOTE: we never ever schedule anything for in-mem processes, they have to finish in a single
- // go.
+
+ // NOTE: we never ever schedule anything for in-mem processes, they have to finish in a single
+ // go.
if (_bpelProcess.isInMemory())
throw new BpelEngineException("In-memory process timeout.");
-
+
try {
WorkEvent we = new WorkEvent();
we.setIID(_iid);
@@ -1074,61 +1114,43 @@
}
}
- private void completeOutstandingMessageExchanges() {
+ /**
+ * Called when the process completes to clean up any outstanding message exchanges.
+ *
+ */
+ private void cleanupOutstandingMyRoleExchanges(FaultData optionalFaultData) {
String[] mexRefs = _outstandingRequests.releaseAll();
for (String mexId : mexRefs) {
MessageExchangeDAO mexDao = _dao.getConnection().getMessageExchange(mexId);
if (mexDao != null) {
-
-
- switch (mex.getStatus()) {
- case ASYNC:
- case RESPONSE:
- mex.setStatus(MessageExchange.Status.COMPLETED_OK);
- break;
- case REQUEST:
- if (mex.getPattern().equals(MessageExchange.MessageExchangePattern.REQUEST_ONLY)) {
- mex.setStatus(MessageExchange.Status.COMPLETED_OK);
- break;
- }
- default:
- mex.setFailure(FailureType.OTHER, "No response.", null);
- _bpelProcess._engine._contexts.mexContext.onAsyncReply(mex);
- mex.release();
+ Status status = MessageExchange.Status.valueOf(mexDao.getStatus());
+ MessageExchangePattern pattern = MessageExchange.MessageExchangePattern.valueOf(mexDao.getPattern());
+ InvocationStyle istyle = InvocationStyle.valueOf(mexDao.getInvocationStyle());
+ if (pattern == MessageExchangePattern.REQUEST_ONLY) {
+ mexDao.setStatus(Status.COMPLETED_OK.toString());
+ continue;
}
- }
- }
- }
- private void faultOutstandingMessageExchanges(FaultData faultData) {
- String[] mexRefs = _outstandingRequests.releaseAll();
- for (String mexId : mexRefs) {
- MessageExchangeDAO mexDao = _dao.getConnection().getMessageExchange(mexId);
- if (mexDao != null) {
- ReliableMyRoleMessageExchangeImpl mex = new ReliableMyRoleMessageExchangeImpl(_bpelProcess._engine, mexDao);
- _bpelProcess.initMyRoleMex(mex);
+ mexDao.setFailureType(FailureType.NO_RESPONSE.toString());
+ if (optionalFaultData != null) {
+ mexDao.setFaultExplanation(optionalFaultData.toString());
+ }
+ mexDao.setFaultExplanation("Process completed without responding.");
- Message message = mex.createMessage(faultData.getFaultName());
- if (faultData.getFaultMessage() != null)
- message.setMessage(faultData.getFaultMessage());
- mex.setResponse(message);
+ switch (istyle) {
+ case RELIABLE:
+ scheduleReliableResponse(mexDao);
+ break;
+ case ASYNC:
+ scheduleAsyncResponse(mexDao);
+ }
- mex.setFault(faultData.getFaultName(), message);
- mex.setFaultExplanation(faultData.getExplanation());
- _contexts.mexContext.onAsyncReply(mex);
}
}
}
- private void failOutstandingMessageExchanges() {
- String[] mexRefs = _outstandingRequests.releaseAll();
- for (String mexId : mexRefs) {
- MessageExchangeDAO mexDao = _dao.getConnection().getMessageExchange(mexId);
- ReliableMyRoleMessageExchangeImpl mex = new ReliableMyRoleMessageExchangeImpl(_bpelProcess, mexDao);
- _bpelProcess.initMyRoleMex(mex);
- mex.setFailure(FailureType.OTHER, "No response.", null);
- _contexts.mexContext.onAsyncReply(mex);
- }
+ private void cleanupOutstandingMyRoleExchanges() {
+ cleanupOutstandingMyRoleExchanges(null);
}
public Element getPartnerResponse(String mexId) {
@@ -1361,34 +1383,28 @@
}
}
-
-
/**
- * Add a scheduled ASYNC response.
+ * Add a scheduled ASYNC response.
*
* @param messageExchangeId
*/
- private void scheduleAsyncResponse(String messageExchangeId) {
+ private void scheduleAsyncResponse(MessageExchangeDAO mexdao) {
assert !_bpelProcess.isInMemory() : "Internal error; attempt to schedule in-memory process";
- assert _contexts.scheduler.isTransacted();
-
- WorkEvent we = new WorkEvent();
- we.setIID(_iid);
- we.setMexId(messageExchangeId);
- we.setProcessId(_bpelProcess.getPID());
- we.setType(WorkEvent.Type.MYROLE_INVOKE_ASYNC_RESPONSE);
- _contexts.scheduler.scheduleVolatileJob(false,null);
-
+ assert _contexts.isTransacted();
+
+ final MyRoleMessageExchangeImpl mex = _bpelProcess.createMyRoleMex(mexdao);
+ _pendingMyRoleReplies.add(mex);
}
- private void scheduleReliableResponse(String messageExchangeId) {
+
+ private void scheduleReliableResponse(MessageExchangeDAO messageExchange) {
assert !_bpelProcess.isInMemory() : "Internal error; attempt to schedule in-memory process";
- assert _contexts.scheduler.isTransacted();
-
+ assert _contexts.isTransacted();
+
WorkEvent we = new WorkEvent();
we.setIID(_iid);
- we.setMexId(messageExchangeId);
+ we.setMexId(messageExchange.getMessageExchangeId());
we.setProcessId(_bpelProcess.getPID());
we.setType(WorkEvent.Type.MYROLE_INVOKE_ASYNC_RESPONSE);
_contexts.scheduler.schedulePersistedJob(we.getDetail(), null);
@@ -1403,6 +1419,31 @@
private void continuePartnerReplied(MessageExchangeDAO pmex) {
}
+
+ class BlockingInvoker implements Runnable {
+ public BlockingInvoker(BlockingPartnerRoleMessageExchangeImpl blockingMex) {
+ // TODO Auto-generated constructor stub
+ }
+ public void run() {
+ // TODO Auto-generated method stub
+
+ }
+
+ }
+
+
+ class AsyncInvoker implements Runnable {
+
+ public AsyncInvoker(AsyncPartnerRoleMessageExchangeImpl asyncMex) {
+ // TODO Auto-generated constructor stub
+ }
+
+ public void run() {
+ // TODO Auto-generated method stub
+
+ }
+
+ }
}
Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java?view=diff&rev=559204&r1=559203&r2=559204
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java Tue Jul 24 13:58:12 2007
@@ -27,6 +27,9 @@
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -64,8 +67,6 @@
import org.apache.ode.utils.stl.CollectionsX;
import org.apache.ode.utils.stl.MemberOfFunction;
-import com.sun.corba.se.spi.activation._ActivatorImplBase;
-
/**
* <p>
* The BPEL server implementation.
@@ -114,6 +115,8 @@
private Properties _configProperties;
+ private ExecutorService _exec;
+
BpelDatabase _db;
/**
@@ -156,6 +159,10 @@
__log.debug("BPEL SERVER starting.");
+
+ if (_exec == null)
+ _exec = Executors.newCachedThreadPool();
+
_contexts.scheduler.start();
_state = State.RUNNING;
__log.info(__msgs.msgServerStarted());
@@ -226,7 +233,7 @@
__log.debug("BPEL SERVER initializing ");
- _db = new BpelDatabase(_contexts.dao, _contexts.scheduler);
+ _db = new BpelDatabase(_contexts);
_state = State.INIT;
} finally {
@@ -393,18 +400,26 @@
}
}
- public void onScheduledJob(JobInfo jobInfo) throws JobProcessorException {
+ public void onScheduledJob(final JobInfo jobInfo) throws JobProcessorException {
_mngmtLock.readLock().lock();
try {
- WorkEvent we = new WorkEvent(jobInfo.jobDetail);
+ final WorkEvent we = new WorkEvent(jobInfo.jobDetail);
BpelProcess process = _registeredProcesses.get(we.getProcessId());
if (process == null) {
// If the process is not active, it means that we should not be
// doing any work on its behalf, therefore we will reschedule the
// events for some time in the future (1 minute).
- Date future = new Date(System.currentTimeMillis() + (60 * 1000));
- __log.info(__msgs.msgReschedulingJobForInactiveProcess(we.getProcessId(), jobInfo.jobName, future));
- _contexts.scheduler.schedulePersistedJob(we.getDetail(), future);
+ _contexts.execTransaction(new Callable<Void>() {
+
+ public Void call() throws Exception {
+ _contexts.scheduler.jobCompleted(jobInfo.jobName);
+ Date future = new Date(System.currentTimeMillis() + (60 * 1000));
+ __log.info(__msgs.msgReschedulingJobForInactiveProcess(we.getProcessId(), jobInfo.jobName, future));
+ _contexts.scheduler.schedulePersistedJob(we.getDetail(), future);
+ return null;
+ }
+
+ });
return;
}
@@ -481,7 +496,7 @@
switch (istyle) {
case ASYNC:
try {
- mexId = _contexts.scheduler.execIsolatedTransaction(createDao).get();
+ mexId = _contexts.execTransaction(createDao);
} catch (Exception e) {
__log.error("Internal Error: could not execute isolated transaction.", e);
throw new BpelEngineException("Internal Error", e);
@@ -490,7 +505,7 @@
break;
case BLOCKING:
try {
- mexId = _contexts.scheduler.execIsolatedTransaction(createDao).get();
+ mexId = _contexts.execTransaction(createDao);
} catch (Exception e) {
__log.error("Internal Error: could not execute isolated transaction.", e);
throw new BpelEngineException("Internal Error", e);
@@ -576,7 +591,7 @@
return loadMex.call();
// TODO: should we not do this in the current thread if the mex is a transacted/reliable?
- return _contexts.scheduler.execIsolatedTransaction(loadMex).get();
+ return execIsolatedTransaction(loadMex).get();
} catch (ContextException e) {
throw new BpelEngineException(e);
} catch (Exception e) {
@@ -621,6 +636,7 @@
return null;
}
+
void registerMessageExchangeStateListener(MessageExchangeStateListener mexStateListener) {
WeakReference<MessageExchangeStateListener> ref = new WeakReference<MessageExchangeStateListener>(mexStateListener);
@@ -641,8 +657,27 @@
}
}
+
+ <T> Future<T> execIsolatedTransaction(final Callable<T> transaction) throws ContextException {
+ return _exec.submit(new Callable<T>() {
+ public T call() throws Exception {
+
+ return _contexts.execTransaction(transaction);
+ }
+ });
+ }
+
+ /**
+ * Schedule a {@link Runnable} object for execution after the completion of the current transaction.
+ * @param runnable
+ */
+ void scheduleRunnable(Runnable runnable) {
+ assertTransaction();
+ _contexts.registerCommitSynchronizer(runnable);
+ }
+
protected void assertTransaction() {
- if (!_contexts.scheduler.isTransacted())
+ if (!_contexts.isTransacted())
throw new BpelEngineException("Operation must be performed in a transaction!");
}
@@ -724,4 +759,6 @@
_mngmtLock.readLock().unlock();
}
}
+
+
}
Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java?view=diff&rev=559204&r1=559203&r2=559204
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java Tue Jul 24 13:58:12 2007
@@ -21,21 +21,30 @@
import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
import org.apache.ode.bpel.iapi.BindingContext;
+import org.apache.ode.bpel.iapi.BpelEngineException;
import org.apache.ode.bpel.iapi.BpelEventListener;
+import org.apache.ode.bpel.iapi.ContextException;
import org.apache.ode.bpel.iapi.EndpointReferenceContext;
import org.apache.ode.bpel.iapi.MessageExchangeContext;
import org.apache.ode.bpel.iapi.Scheduler;
import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
import java.util.List;
+import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
+import javax.transaction.Status;
+import javax.transaction.Synchronization;
+import javax.transaction.SystemException;
+import javax.transaction.TransactionManager;
+
/**
- * Aggregation of all the contexts provided to the BPEL engine by the
- * integration layer.
+ * Aggregation of all the contexts provided to the BPEL engine by the integration layer.
*/
class Contexts {
+ TransactionManager txManager;
+
MessageExchangeContext mexContext;
Scheduler scheduler;
@@ -46,11 +55,84 @@
BpelDAOConnectionFactory dao;
- /** Global Message-Exchange interceptors. Must be copy-on-write!!! */
- final List<MessageExchangeInterceptor >globalIntereceptors = new CopyOnWriteArrayList<MessageExchangeInterceptor>();
+ /** Global Message-Exchange interceptors. Must be copy-on-write!!! */
+ final List<MessageExchangeInterceptor> globalIntereceptors = new CopyOnWriteArrayList<MessageExchangeInterceptor>();
/** Global event listeners. Must be copy-on-write!!! */
final List<BpelEventListener> eventListeners = new CopyOnWriteArrayList<BpelEventListener>();
+ public boolean isTransacted() {
+ try {
+ return txManager.getStatus() == Status.STATUS_ACTIVE;
+ } catch (SystemException e) {
+ throw new BpelEngineException(e);
+ }
+ }
+
+ public void execTransaction(final Runnable transaction) {
+ try {
+ execTransaction(new Callable<Void>() {
+
+ public Void call() throws Exception {
+ transaction.run();
+ return null;
+ }
+
+ });
+ } catch (Exception e) {
+ throw new BpelEngineException(e);
+ }
+
+ }
+
+ public <T> T execTransaction(Callable<T> transaction) throws Exception{
+ try {
+ txManager.begin();
+ } catch (Exception ex) {
+ String errmsg = "Internal Error, could not begin transaction.";
+ throw new BpelEngineException(errmsg, ex);
+ }
+ boolean success = false;
+ try {
+ T retval = transaction.call();
+ success = true;
+ return retval;
+ } catch (Exception ex) {
+ throw ex;
+ } finally {
+ if (success)
+ try {
+ txManager.commit();
+ } catch (Exception ex) {
+ throw new BpelEngineException("Could not commit.", ex);
+ }
+ else
+ try {
+ txManager.rollback();
+ } catch (Exception ex) {
+ throw new BpelEngineException("Could not rollback.", ex);
+
+ }
+ }
+ }
+
+ public void registerCommitSynchronizer(final Runnable runnable) {
+ try {
+ txManager.getTransaction().registerSynchronization(new Synchronization() {
+
+ public void afterCompletion(int status) {
+ if (status == Status.STATUS_COMMITTED)
+ runnable.run();
+ }
+
+ public void beforeCompletion() {
+
+ }
+
+ });
+ } catch (Exception ex) {
+ throw new BpelEngineException("Error registering synchronizer." ,ex);
+ }
+ }
}
Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/DebuggerSupport.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/DebuggerSupport.java?view=diff&rev=559204&r1=559203&r2=559204
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/DebuggerSupport.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/DebuggerSupport.java Tue Jul 24 13:58:12 2007
@@ -80,8 +80,7 @@
*/
DebuggerSupport(BpelProcess process) {
_process = process;
- _db = new BpelProcessDatabase(_process._contexts.dao,
- _process._contexts.scheduler,
+ _db = new BpelProcessDatabase(_process._contexts,
_process._pid);
}
Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java?view=diff&rev=559204&r1=559203&r2=559204
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java Tue Jul 24 13:58:12 2007
@@ -401,7 +401,7 @@
}
protected void assertTransaction() {
- if (!_contexts.scheduler.isTransacted())
+ if (!_contexts.isTransacted())
throw new BpelEngineException("Operation must be performed in a transaction!");
}
@@ -411,7 +411,7 @@
return action.call(getDAO());
} else {
try {
- return _contexts.scheduler.execIsolatedTransaction(new Callable<T>() {
+ return _process._server.execIsolatedTransaction(new Callable<T>() {
public T call() throws Exception {
assertTransaction();
return action.call(getDAO());
Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java?view=diff&rev=559204&r1=559203&r2=559204
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java Tue Jul 24 13:58:12 2007
@@ -100,7 +100,7 @@
protected void scheduleInvoke(BpelProcess target) {
assert !_process.isInMemory() : "Cannot schedule invokes for in-memory processes.";
- assert _contexts.scheduler.isTransacted() : "Cannot schedule outside of transaction context.";
+ assert _contexts.isTransacted() : "Cannot schedule outside of transaction context.";
// Schedule a new job for invocation
final WorkEvent we = new WorkEvent();
Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java?view=diff&rev=559204&r1=559203&r2=559204
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java Tue Jul 24 13:58:12 2007
@@ -32,10 +32,12 @@
import org.apache.ode.bpel.evt.CorrelationNoMatchEvent;
import org.apache.ode.bpel.evt.NewProcessInstanceEvent;
import org.apache.ode.bpel.iapi.Endpoint;
+import org.apache.ode.bpel.iapi.InvocationStyle;
import org.apache.ode.bpel.iapi.MessageExchange;
import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
import org.apache.ode.bpel.iapi.ProcessState;
import org.apache.ode.bpel.iapi.MessageExchange.Status;
+import org.apache.ode.bpel.iapi.MyRoleMessageExchange.CorrelationStatus;
import org.apache.ode.bpel.intercept.InterceptorInvoker;
import org.apache.ode.bpel.o.OMessageVarType;
import org.apache.ode.bpel.o.OPartnerLink;
@@ -86,7 +88,7 @@
* @param mex
* exchange to which the message is related
*/
- public void invokeMyRole(MessageExchangeDAO mex) {
+ public CorrelationStatus invokeMyRole(MessageExchangeDAO mex) {
if (__log.isTraceEnabled()) {
__log.trace(ObjectPrinter.stringifyMethodEnter(this + ":inputMsgRcvd", new Object[] { "messageExchange", mex }));
}
@@ -97,151 +99,134 @@
mex.setStatus(Status.FAILURE.toString());
mex.setFailureType(MessageExchange.FailureType.UNKNOWN_OPERATION.toString());
mex.setFaultExplanation(mex.getOperation());
- return;
+ return null;
}
// Is this a /possible/ createInstance Operation?
boolean isCreateInstnace = _plinkDef.isCreateInstanceOperation(operation);
-
- // now, the tricks begin: when a message arrives we have to see if there is anyone waiting for it. Get the correlator, a
- // persisted communnication-reduction data structure supporting correlation correlationKey matching!
String correlatorId = BpelProcess.genCorrelatorId(_plinkDef, operation.getName());
-
CorrelatorDAO correlator = _process.getProcessDAO().getCorrelator(correlatorId);
- CorrelationKey[] keys;
- MessageRouteDAO messageRoute = null;
+ // Special logic for in-mem processes, only createInstance is allowed, so we can skip the
+ // correlation BS to save time.
+ if (_process.isInMemory()) {
+ if (isCreateInstnace)
+ invokeMyRoleCreateInstance(mex, operation, correlatorId, correlator);
+ else {
+ mex.setStatus(Status.FAILURE.toString());
+ mex.setFailureType(MessageExchange.FailureType.OTHER.toString());
+ mex.setFaultExplanation("Invalid in-memory process: non createInstance operations are not supported!");
+ return null;
+ }
- // We need to compute the correlation keys (based on the operation
- // we can infer which correlation keys to compute - this is merely a set
- // consisting of each correlationKey used in each correlation sets
- // that is ever referenced in an <receive>/<onMessage> on this
- // partnerlink/operation.
- try {
- keys = computeCorrelationKeys(mex, operation);
- } catch (InvalidMessageException ime) {
- // We'd like to do a graceful exit here, no sense in rolling back due to a
- // a message format problem.
- __log.debug("Unable to evaluate correlation keys, invalid message format. ", ime);
- mex.setFailureType(MessageExchange.FailureType.FORMAT_ERROR.toString());
- mex.setStatus(Status.FAILURE.toString());
- mex.setFaultExplanation(ime.getMessage());
+ } else {
- return;
- }
+ MessageRouteDAO messageRoute = null;
- String mySessionId = mex.getProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID);
- String partnerSessionId = mex.getProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID);
- if (__log.isDebugEnabled()) {
- __log.debug("INPUTMSG: " + correlatorId + ": MSG RCVD keys=" + ArrayUtils.makeCollection(HashSet.class, keys)
- + " mySessionId=" + mySessionId + " partnerSessionId=" + partnerSessionId);
- }
+ // now, the tricks begin: when a message arrives we have to see if there is anyone waiting for it. Get the correlator, a
+ // persisted communnication-reduction data structure supporting correlation correlationKey matching!
- CorrelationKey matchedKey = null;
+ CorrelationKey[] keys;
- // Try to find a route for one of our keys.
- for (CorrelationKey key : keys) {
- messageRoute = correlator.findRoute(key);
- if (messageRoute != null) {
- if (__log.isDebugEnabled()) {
- __log.debug("INPUTMSG: " + correlatorId + ": ckey " + key + " route is to " + messageRoute);
- }
- matchedKey = key;
- break;
- }
- }
+ // We need to compute the correlation keys (based on the operation
+ // we can infer which correlation keys to compute - this is merely a set
+ // consisting of each correlationKey used in each correlation sets
+ // that is ever referenced in an <receive>/<onMessage> on this
+ // partnerlink/operation.
+ try {
+ keys = computeCorrelationKeys(mex, operation);
+ } catch (InvalidMessageException ime) {
+ // We'd like to do a graceful exit here, no sense in rolling back due to a
+ // a message format problem.
+ __log.debug("Unable to evaluate correlation keys, invalid message format. ", ime);
+ mex.setFailureType(MessageExchange.FailureType.FORMAT_ERROR.toString());
+ mex.setStatus(Status.FAILURE.toString());
+ mex.setFaultExplanation(ime.getMessage());
- // TODO - ODE-58
+ return null;
+ }
- // If no luck, and this operation qualifies for create-instance
- // treatment, then create a new process
- // instance.
- if (messageRoute == null && isCreateInstnace) {
+ String mySessionId = mex.getProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID);
+ String partnerSessionId = mex.getProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID);
if (__log.isDebugEnabled()) {
- __log.debug("INPUTMSG: " + correlatorId + ": routing failed, CREATING NEW INSTANCE");
+ __log.debug("INPUTMSG: " + correlatorId + ": MSG RCVD keys=" + ArrayUtils.makeCollection(HashSet.class, keys)
+ + " mySessionId=" + mySessionId + " partnerSessionId=" + partnerSessionId);
}
- ProcessDAO processDAO = _process.getProcessDAO();
- if (_process._pconf.getState() == ProcessState.RETIRED) {
- throw new InvalidProcessException("Process is retired.", InvalidProcessException.RETIRED_CAUSE_CODE);
+ CorrelationKey matchedKey = null;
+
+ // Try to find a route for one of our keys.
+ for (CorrelationKey key : keys) {
+ messageRoute = correlator.findRoute(key);
+ if (messageRoute != null) {
+ if (__log.isDebugEnabled()) {
+ __log.debug("INPUTMSG: " + correlatorId + ": ckey " + key + " route is to " + messageRoute);
+ }
+ matchedKey = key;
+ break;
+ }
}
- // if (!_process.processInterceptors(mex, InterceptorInvoker.__onNewInstanceInvoked)) {
- // __log.debug("Not creating a new instance for mex " + mex + "; interceptor prevented!");
- // return;
- // }
-
- ProcessInstanceDAO newInstance = processDAO.createInstance(correlator);
-
- BpelRuntimeContextImpl instance = _process.createRuntimeContext(newInstance, new PROCESS(_process.getOProcess()), mex);
-
- // send process instance event
- NewProcessInstanceEvent evt = new NewProcessInstanceEvent(new QName(_process.getOProcess().targetNamespace, _process
- .getOProcess().getName()), _process.getProcessDAO().getProcessId(), newInstance.getInstanceId());
- evt.setPortType(mex.getPortType());
- evt.setOperation(operation.getName());
- evt.setMexId(mex.getMessageExchangeId());
- _process._debugger.onEvent(evt);
- _process.saveEvent(evt, newInstance);
- mex.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.CREATE_INSTANCE.toString());
- mex.setInstance(newInstance);
+ // TODO - ODE-58
- instance.execute();
- } else if (messageRoute != null) {
- if (__log.isDebugEnabled()) {
- __log.debug("INPUTMSG: " + correlatorId + ": ROUTING to instance "
- + messageRoute.getTargetInstance().getInstanceId());
- }
+ // If no luck, and this operation qualifies for create-instance
+ // treatment, then create a new process
+ // instance.
+ if (messageRoute == null && isCreateInstnace) {
+ invokeMyRoleCreateInstance(mex, operation, correlatorId, correlator);
+ } else if (messageRoute != null) {
+ if (__log.isDebugEnabled()) {
+ __log.debug("INPUTMSG: " + correlatorId + ": ROUTING to instance "
+ + messageRoute.getTargetInstance().getInstanceId());
+ }
- ProcessInstanceDAO instanceDao = messageRoute.getTargetInstance();
+ ProcessInstanceDAO instanceDao = messageRoute.getTargetInstance();
- // Reload process instance for DAO.
- BpelRuntimeContextImpl instance = _process.createRuntimeContext(instanceDao, null, null);
- instance.inputMsgMatch(messageRoute.getGroupId(), messageRoute.getIndex(), mex);
-
- // Kill the route so some new message does not get routed to
- // same process instance.
- correlator.removeRoutes(messageRoute.getGroupId(), instanceDao);
-
- // send process instance event
- CorrelationMatchEvent evt = new CorrelationMatchEvent(new QName(_process.getOProcess().targetNamespace, _process
- .getOProcess().getName()), _process.getProcessDAO().getProcessId(), instanceDao.getInstanceId(), matchedKey);
- evt.setPortType(mex.getPortType());
- evt.setOperation(operation.getName());
- evt.setMexId(mex.getMessageExchangeId());
-
- _process._debugger.onEvent(evt);
- // store event
- _process.saveEvent(evt, instanceDao);
-
- mex.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.MATCHED.toString());
- mex.setInstance(messageRoute.getTargetInstance());
- instance.execute();
- } else {
- if (__log.isDebugEnabled()) {
- __log.debug("INPUTMSG: " + correlatorId + ": SAVING to DB (no match) ");
+ // Reload process instance for DAO.
+
+ // Kill the route so some new message does not get routed to
+ // same process instance.
+ correlator.removeRoutes(messageRoute.getGroupId(), instanceDao);
+
+ // send process instance event
+ CorrelationMatchEvent evt = new CorrelationMatchEvent(new QName(_process.getOProcess().targetNamespace, _process
+ .getOProcess().getName()), _process.getProcessDAO().getProcessId(), instanceDao.getInstanceId(), matchedKey);
+ evt.setPortType(mex.getPortType());
+ evt.setOperation(operation.getName());
+ evt.setMexId(mex.getMessageExchangeId());
+
+ _process._debugger.onEvent(evt);
+ // store event
+ _process.saveEvent(evt, instanceDao);
+
+ mex.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.MATCHED.toString());
+ mex.setInstance(messageRoute.getTargetInstance());
+
+ // We're overloading the channel here to be the PICK response channel + index
+ mex.setChannel(messageRoute.getGroupId() + "&" + messageRoute.getIndex());
+ } else {
+ if (__log.isDebugEnabled()) {
+ __log.debug("INPUTMSG: " + correlatorId + ": SAVING to DB (no match) ");
+ }
+
+ // TODO: Revist (BART)
+ // if (!mex.isAsynchronous()) {
+ // mex.setFailure(MessageExchange.FailureType.NOMATCH, "No process instance matching correlation keys.", null);
+ //
+ // } else {
+ // send event
+ CorrelationNoMatchEvent evt = new CorrelationNoMatchEvent(mex.getPortType(), mex.getOperation(), mex
+ .getMessageExchangeId(), keys);
+
+ evt.setProcessId(_process.getProcessDAO().getProcessId());
+ evt.setProcessName(new QName(_process.getOProcess().targetNamespace, _process.getOProcess().getName()));
+ _process._debugger.onEvent(evt);
+
+ mex.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.QUEUED.toString());
+ correlator.enqueueMessage(mex, keys);
}
- // TODO: Revist (BART)
- // if (!mex.isAsynchronous()) {
- // mex.setFailure(MessageExchange.FailureType.NOMATCH, "No process instance matching correlation keys.", null);
- //
- // } else {
- // send event
- CorrelationNoMatchEvent evt = new CorrelationNoMatchEvent(mex.getPortType(), mex.getOperation(), mex
- .getMessageExchangeId(), keys);
-
- evt.setProcessId(_process.getProcessDAO().getProcessId());
- evt.setProcessName(new QName(_process.getOProcess().targetNamespace, _process.getOProcess().getName()));
- _process._debugger.onEvent(evt);
-
- mex.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.QUEUED.toString());
-
- // No match, means we add message exchange to the queue.
- correlator.enqueueMessage(mex, keys);
- // }
}
-
// Now we have to update our message exchange status. If the <reply>
// was not hit during the
// invocation, then we will be in the "REQUEST" phase which means
@@ -250,6 +235,39 @@
if (Status.valueOf(mex.getStatus()) == MessageExchange.Status.REQUEST) {
mex.setStatus(MessageExchange.Status.ASYNC.toString());
}
+
+ return CorrelationStatus.valueOf(mex.getCorrelationStatus());
+ }
+
+ private void invokeMyRoleCreateInstance(MessageExchangeDAO mex, Operation operation, String correlatorId,
+ CorrelatorDAO correlator) {
+ if (__log.isDebugEnabled()) {
+ __log.debug("INPUTMSG: " + correlatorId + ": routing failed, CREATING NEW INSTANCE");
+ }
+ ProcessDAO processDAO = _process.getProcessDAO();
+
+ if (_process._pconf.getState() == ProcessState.RETIRED) {
+ throw new InvalidProcessException("Process is retired.", InvalidProcessException.RETIRED_CAUSE_CODE);
+ }
+
+ // if (!_process.processInterceptors(mex, InterceptorInvoker.__onNewInstanceInvoked)) {
+ // __log.debug("Not creating a new instance for mex " + mex + "; interceptor prevented!");
+ // return;
+ // }
+
+ ProcessInstanceDAO newInstance = processDAO.createInstance(correlator);
+
+ // send process instance event
+ NewProcessInstanceEvent evt = new NewProcessInstanceEvent(new QName(_process.getOProcess().targetNamespace, _process
+ .getOProcess().getName()), _process.getProcessDAO().getProcessId(), newInstance.getInstanceId());
+ evt.setPortType(mex.getPortType());
+ evt.setOperation(operation.getName());
+ evt.setMexId(mex.getMessageExchangeId());
+ _process._debugger.onEvent(evt);
+ _process.saveEvent(evt, newInstance);
+ mex.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.CREATE_INSTANCE.toString());
+ mex.setInstance(newInstance);
+
}
@SuppressWarnings("unchecked")
Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java?view=diff&rev=559204&r1=559203&r2=559204
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java Tue Jul 24 13:58:12 2007
@@ -39,15 +39,4 @@
_initialPartner = initialPartner;
}
- public void processPartnerResponse(PartnerRoleMessageExchangeImpl messageExchange) {
- if (__log.isDebugEnabled()) {
- __log.debug("Processing partner's response for partnerLink: " + messageExchange);
- }
-
- BpelRuntimeContextImpl processInstance =
- _process.createRuntimeContext(messageExchange.getDAO().getInstance(), null, null);
- processInstance.invocationResponse(messageExchange);
- processInstance.execute();
- }
-
}
Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java?view=diff&rev=559204&r1=559203&r2=559204
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java Tue Jul 24 13:58:12 2007
@@ -22,7 +22,7 @@
protected void checkReplyContextOk() {
super.checkReplyContextOk();
- if (!_contexts.scheduler.isTransacted())
+ if (!_contexts.isTransacted())
throw new BpelEngineException("Cannot replyXXX from non-transaction context!");
}
@@ -40,7 +40,7 @@
@Override
protected void resumeInstance() {
// TODO Auto-generated method stub
- assert _contexts.scheduler.isTransacted() : "checkReplyContext() should have prevented us from getting here.";
+ assert _contexts.isTransacted() : "checkReplyContext() should have prevented us from getting here.";
assert !_process.isInMemory() : "resumeInstance() for reliable in-mem processes makes no sense.";
final WorkEvent we = generateInvokeResponseWorkEvent();
Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java?view=diff&rev=559204&r1=559203&r2=559204
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java Tue Jul 24 13:58:12 2007
@@ -23,15 +23,11 @@
public Status invokeTransacted() throws BpelEngineException {
assertTransaction();
- boolean success = false;
- try {
- _process.invokeProcess(getDAO());
- if (MessageExchange.Status.valueOf(getDAO().getStatus()) != Status.RESPONSE)
- throw new BpelEngineException("Transactional invoke on process did not yield a response.");
- success = true;
- } finally {
-
- }
+ _process.invokeProcess(getDAO());
+ if (MessageExchange.Status.valueOf(getDAO().getStatus()) != Status.RESPONSE)
+ throw new BpelEngineException("Transactional invoke on process did not yield a response.");
+ return Status.valueOf(getDAO().getStatus());
+
}
@Override
Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionFactoryImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionFactoryImpl.java?view=diff&rev=559204&r1=559203&r2=559204
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionFactoryImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionFactoryImpl.java Tue Jul 24 13:58:12 2007
@@ -22,6 +22,7 @@
import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
import org.apache.ode.bpel.iapi.Scheduler;
+import javax.transaction.TransactionManager;
import javax.xml.namespace.QName;
import java.util.HashMap;
import java.util.Map;
@@ -33,14 +34,14 @@
public class BpelDAOConnectionFactoryImpl implements BpelDAOConnectionFactory {
private static final Map<QName, ProcessDaoImpl> __StateStore = new HashMap<QName, ProcessDaoImpl>();
- private Scheduler _scheduler;
+ private TransactionManager _txm;
- public BpelDAOConnectionFactoryImpl(Scheduler sched) {
- _scheduler = sched;
+ public BpelDAOConnectionFactoryImpl(TransactionManager txm) {
+ _txm = txm;
}
public BpelDAOConnection getConnection() {
- return new BpelDAOConnectionImpl(__StateStore, _scheduler);
+ return new BpelDAOConnectionImpl(__StateStore, _txm);
}
/**
Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java?view=diff&rev=559204&r1=559203&r2=559204
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java Tue Jul 24 13:58:12 2007
@@ -35,6 +35,10 @@
import org.apache.ode.utils.stl.CollectionsX;
import org.apache.ode.utils.stl.UnaryFunction;
+import javax.transaction.RollbackException;
+import javax.transaction.Synchronization;
+import javax.transaction.SystemException;
+import javax.transaction.TransactionManager;
import javax.xml.namespace.QName;
import java.util.ArrayList;
import java.util.Collection;
@@ -47,22 +51,26 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
-
/**
* A very simple, in-memory implementation of the {@link BpelDAOConnection} interface.
*/
class BpelDAOConnectionImpl implements BpelDAOConnection {
private static final Log __log = LogFactory.getLog(BpelDAOConnectionImpl.class);
- private Scheduler _scheduler;
+ private TransactionManager _txm;
+
private Map<QName, ProcessDaoImpl> _store;
+
private List<BpelEvent> _events = new LinkedList<BpelEvent>();
- private static Map<String,MessageExchangeDAO> _mexStore = Collections.synchronizedMap(new HashMap<String,MessageExchangeDAO>());
+
+ private static Map<String, MessageExchangeDAO> _mexStore = Collections
+ .synchronizedMap(new HashMap<String, MessageExchangeDAO>());
+
private static AtomicLong counter = new AtomicLong(Long.MAX_VALUE / 2);
- BpelDAOConnectionImpl(Map<QName, ProcessDaoImpl> store, Scheduler scheduler) {
+ BpelDAOConnectionImpl(Map<QName, ProcessDaoImpl> store, TransactionManager txm) {
_store = store;
- _scheduler = scheduler;
+ _txm = txm;
}
public ProcessDAO getProcess(QName processId) {
@@ -70,8 +78,8 @@
}
public ProcessDAO createProcess(QName pid, QName type, String guid, long version) {
- ProcessDaoImpl process = new ProcessDaoImpl(this,_store,pid,type, guid,version);
- _store.put(pid,process);
+ ProcessDaoImpl process = new ProcessDaoImpl(this, _store, pid, type, guid, version);
+ _store.put(pid, process);
return process;
}
@@ -85,13 +93,12 @@
}
public Collection<ProcessInstanceDAO> instanceQuery(InstanceFilter filter) {
- if(filter.getLimit()==0) {
+ if (filter.getLimit() == 0) {
return Collections.EMPTY_LIST;
}
List<ProcessInstanceDAO> matched = new ArrayList<ProcessInstanceDAO>();
// Selecting
- selectionCompleted:
- for (ProcessDaoImpl proc : _store.values()) {
+ selectionCompleted: for (ProcessDaoImpl proc : _store.values()) {
boolean pmatch = true;
if (filter.getNameFilter() != null
&& !equalsOrWildcardMatch(filter.getNameFilter(), proc.getProcessId().getLocalPart()))
@@ -107,9 +114,11 @@
if (filter.getStatusFilter() != null) {
boolean statusMatch = false;
for (Short status : filter.convertFilterState()) {
- if (inst.getState() == status.byteValue()) statusMatch = true;
+ if (inst.getState() == status.byteValue())
+ statusMatch = true;
}
- if (!statusMatch) match = false;
+ if (!statusMatch)
+ match = false;
}
if (filter.getStartedDateFilter() != null
&& !dateMatch(filter.getStartedDateFilter(), inst.getCreateTime(), filter))
@@ -118,25 +127,25 @@
&& !dateMatch(filter.getLastActiveDateFilter(), inst.getLastActiveTime(), filter))
match = false;
-// if (filter.getPropertyValuesFilter() != null) {
-// for (Map.Entry propEntry : filter.getPropertyValuesFilter().entrySet()) {
-// boolean entryMatched = false;
-// for (ProcessPropertyDAO prop : proc.getProperties()) {
-// if (prop.getName().equals(propEntry.getKey())
-// && (propEntry.getValue().equals(prop.getMixedContent())
-// || propEntry.getValue().equals(prop.getSimpleContent()))) {
-// entryMatched = true;
-// }
-// }
-// if (!entryMatched) {
-// match = false;
-// }
-// }
-// }
+ // if (filter.getPropertyValuesFilter() != null) {
+ // for (Map.Entry propEntry : filter.getPropertyValuesFilter().entrySet()) {
+ // boolean entryMatched = false;
+ // for (ProcessPropertyDAO prop : proc.getProperties()) {
+ // if (prop.getName().equals(propEntry.getKey())
+ // && (propEntry.getValue().equals(prop.getMixedContent())
+ // || propEntry.getValue().equals(prop.getSimpleContent()))) {
+ // entryMatched = true;
+ // }
+ // }
+ // if (!entryMatched) {
+ // match = false;
+ // }
+ // }
+ // }
if (match) {
matched.add(inst);
- if(matched.size()==filter.getLimit()) {
+ if (matched.size() == filter.getLimit()) {
break selectionCompleted;
}
}
@@ -149,9 +158,10 @@
Collections.sort(matched, new Comparator<ProcessInstanceDAO>() {
public int compare(ProcessInstanceDAO o1, ProcessInstanceDAO o2) {
- for (String orderKey: orders) {
+ for (String orderKey : orders) {
int result = compareInstanceUsingKey(orderKey, o1, o2);
- if (result != 0) return result;
+ if (result != 0)
+ return result;
}
return 0;
}
@@ -173,8 +183,8 @@
public MessageExchangeDAO createMessageExchange(char dir) {
String id = Long.toString(counter.getAndIncrement());
- MessageExchangeDAO mex = new MessageExchangeDAOImpl(dir,id);
- _mexStore.put(id,mex);
+ MessageExchangeDAO mex = new MessageExchangeDAOImpl(dir, id);
+ _mexStore.put(id, mex);
return mex;
}
@@ -189,7 +199,8 @@
String orderKey = key;
if (key.startsWith("+") || key.startsWith("-")) {
orderKey = key.substring(1, key.length());
- if (key.startsWith("-")) ascending = false;
+ if (key.startsWith("-"))
+ ascending = false;
}
ProcessDAO process1 = getProcess(instanceDAO1.getProcess().getProcessId());
ProcessDAO process2 = getProcess(instanceDAO2.getProcess().getProcessId());
@@ -203,11 +214,11 @@
s1 = process1.getProcessId().getNamespaceURI();
s2 = process2.getProcessId().getNamespaceURI();
} else if ("version".equals(orderKey)) {
- s1 = ""+process1.getVersion();
- s2 = ""+process2.getVersion();
+ s1 = "" + process1.getVersion();
+ s2 = "" + process2.getVersion();
} else if ("status".equals(orderKey)) {
- s1 = ""+instanceDAO1.getState();
- s2 = ""+instanceDAO2.getState();
+ s1 = "" + instanceDAO1.getState();
+ s2 = "" + instanceDAO2.getState();
} else if ("started".equals(orderKey)) {
s1 = ISO8601DateParser.format(instanceDAO1.getCreateTime());
s2 = ISO8601DateParser.format(instanceDAO2.getCreateTime());
@@ -215,62 +226,71 @@
s1 = ISO8601DateParser.format(instanceDAO1.getLastActiveTime());
s2 = ISO8601DateParser.format(instanceDAO2.getLastActiveTime());
}
- if (ascending) return s1.compareTo(s2);
- else return s2.compareTo(s1);
+ if (ascending)
+ return s1.compareTo(s2);
+ else
+ return s2.compareTo(s1);
}
private boolean equalsOrWildcardMatch(String s1, String s2) {
- if (s1 == null || s2 == null) return false;
- if (s1.equals(s2)) return true;
+ if (s1 == null || s2 == null)
+ return false;
+ if (s1.equals(s2))
+ return true;
if (s1.endsWith("*")) {
- if (s2.startsWith(s1.substring(0, s1.length() - 1))) return true;
+ if (s2.startsWith(s1.substring(0, s1.length() - 1)))
+ return true;
}
if (s2.endsWith("*")) {
- if (s1.startsWith(s2.substring(0, s2.length() - 1))) return true;
+ if (s1.startsWith(s2.substring(0, s2.length() - 1)))
+ return true;
}
return false;
}
- public boolean dateMatch(List<String> dateFilters, Date instanceDate, InstanceFilter filter) {
+ public boolean dateMatch(List<String> dateFilters, Date instanceDate, InstanceFilter filter) {
boolean match = true;
for (String ddf : dateFilters) {
String isoDate = ISO8601DateParser.format(instanceDate);
String critDate = Filter.getDateWithoutOp(ddf);
if (ddf.startsWith("=")) {
- if (!isoDate.startsWith(critDate)) match = false;
+ if (!isoDate.startsWith(critDate))
+ match = false;
} else if (ddf.startsWith("<=")) {
- if (!isoDate.startsWith(critDate) && isoDate.compareTo(critDate) > 0) match = false;
+ if (!isoDate.startsWith(critDate) && isoDate.compareTo(critDate) > 0)
+ match = false;
} else if (ddf.startsWith(">=")) {
- if (!isoDate.startsWith(critDate) && isoDate.compareTo(critDate) < 0) match = false;
+ if (!isoDate.startsWith(critDate) && isoDate.compareTo(critDate) < 0)
+ match = false;
} else if (ddf.startsWith("<")) {
- if (isoDate.compareTo(critDate) > 0) match = false;
+ if (isoDate.compareTo(critDate) > 0)
+ match = false;
} else if (ddf.startsWith(">")) {
- if (isoDate.compareTo(critDate) < 0) match = false;
+ if (isoDate.compareTo(critDate) < 0)
+ match = false;
}
}
return match;
}
-
public ScopeDAO getScope(Long siidl) {
for (ProcessDaoImpl process : _store.values()) {
for (ProcessInstanceDAO instance : process._instances.values()) {
- if (instance.getScope(siidl) != null) return instance.getScope(siidl);
+ if (instance.getScope(siidl) != null)
+ return instance.getScope(siidl);
}
}
return null;
}
-
public void insertBpelEvent(BpelEvent event, ProcessDAO processConfiguration, ProcessInstanceDAO instance) {
_events.add(event);
}
-
public List<Date> bpelEventTimelineQuery(InstanceFilter ifilter, BpelEventFilter efilter) {
// TODO : Provide more correct implementation:
ArrayList<Date> dates = new ArrayList<Date>();
- CollectionsX.transform(dates, _events, new UnaryFunction<BpelEvent,Date>() {
+ CollectionsX.transform(dates, _events, new UnaryFunction<BpelEvent, Date>() {
public Date apply(BpelEvent x) {
return x.getTimestamp();
}
@@ -278,7 +298,6 @@
return dates;
}
-
public List<BpelEvent> bpelEventQuery(InstanceFilter ifilter, BpelEventFilter efilter) {
// TODO : Provide a more correct (filtering) implementation:
return _events;
@@ -288,7 +307,7 @@
* @see org.apache.ode.bpel.dao.BpelDAOConnection#instanceQuery(String)
*/
public Collection<ProcessInstanceDAO> instanceQuery(String expression) {
- //TODO
+ // TODO
throw new UnsupportedOperationException();
}
@@ -301,12 +320,18 @@
}
public void defer(final Runnable runnable) {
- _scheduler.registerSynchronizer(new Scheduler.Synchronizer() {
- public void afterCompletion(boolean success) {
- }
- public void beforeCompletion() {
- runnable.run();
- }
- });
+ try {
+ _txm.getTransaction().registerSynchronization(new Synchronization() {
+ public void afterCompletion(int status) {
+ }
+
+ public void beforeCompletion() {
+ runnable.run();
+ }
+ });
+
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
}
Modified: incubator/ode/branches/bart/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java?view=diff&rev=559204&r1=559203&r2=559204
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java Tue Jul 24 13:58:12 2007
@@ -18,21 +18,35 @@
*/
package org.apache.ode.bpel.runtime;
+import java.io.File;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import javax.sql.DataSource;
+import javax.transaction.TransactionManager;
+import javax.wsdl.PortType;
+import javax.xml.namespace.QName;
+
import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
import org.apache.ode.bpel.dao.BpelDAOConnectionFactoryJDBC;
import org.apache.ode.bpel.engine.BpelServerImpl;
import org.apache.ode.bpel.iapi.BindingContext;
+import org.apache.ode.bpel.iapi.BpelEngineException;
import org.apache.ode.bpel.iapi.ContextException;
import org.apache.ode.bpel.iapi.Endpoint;
import org.apache.ode.bpel.iapi.EndpointReference;
import org.apache.ode.bpel.iapi.EndpointReferenceContext;
+import org.apache.ode.bpel.iapi.InvocationStyle;
import org.apache.ode.bpel.iapi.Message;
import org.apache.ode.bpel.iapi.MessageExchangeContext;
import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
import org.apache.ode.bpel.iapi.PartnerRoleChannel;
import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
import org.apache.ode.bpel.iapi.Scheduler;
-import org.apache.ode.bpel.memdao.BpelDAOConnectionFactoryImpl;
import org.apache.ode.dao.jpa.BPELDAOConnectionFactoryImpl;
import org.apache.ode.il.EmbeddedGeronimoFactory;
import org.apache.ode.il.MockScheduler;
@@ -44,21 +58,6 @@
import org.w3c.dom.Document;
import org.w3c.dom.Element;
-import javax.sql.DataSource;
-import javax.transaction.TransactionManager;
-import javax.wsdl.PortType;
-import javax.xml.namespace.QName;
-import java.io.File;
-import java.util.Collection;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
class MockBpelServer {
@@ -72,8 +71,8 @@
EndpointReferenceContext _eprContext;
MessageExchangeContext _mexContext;
BindingContext _bindContext;
- HashMap<String, QName> _activated = new HashMap();
- HashMap _endpoints = new HashMap();
+ HashMap<String, QName> _activated = new HashMap<String,QName>();
+ HashMap<String, EndpointReference> _endpoints = new HashMap<String, EndpointReference>();
public MockBpelServer() {
try {
@@ -85,7 +84,6 @@
if (_daoCF == null)
throw new RuntimeException("No DAO");
_server.setDaoConnectionFactory(_daoCF);
- _server.setInMemDaoConnectionFactory(new BpelDAOConnectionFactoryImpl(_scheduler));
if (_scheduler == null)
throw new RuntimeException("No scheduler");
_store = new ProcessStoreImpl(_dataSource,"jpa", true);
@@ -115,7 +113,7 @@
MyRoleMessageExchange mex;
_txManager.begin();
- mex = _server.getEngine().createMessageExchange("" + messageId, serviceName, opName);
+ mex = _server.createMessageExchange(InvocationStyle.ASYNC,serviceName, opName, "" + messageId);
if (mex.getOperation() == null)
throw new Exception("Did not find operation " + opName + " on service " + serviceName);
Message request = mex.createMessage(mex.getOperation().getInput().getMessage().getQName());
@@ -124,7 +122,8 @@
Element message = body.getOwnerDocument().createElementNS("", "message");
message.appendChild(wrapper);
request.setMessage(message);
- mex.invoke(request);
+ mex.setRequest(request);
+ mex.invokeAsync();
mex.complete();
_txManager.commit();
} catch (Exception except) {
@@ -216,6 +215,34 @@
_mexContext = new MessageExchangeContext() {
public void invokePartner(PartnerRoleMessageExchange mex) { }
public void onAsyncReply(MyRoleMessageExchange myRoleMex) { }
+ public void cancel(PartnerRoleMessageExchange mex) throws ContextException {
+ // TODO Auto-generated method stub
+
+ }
+ public Set<InvocationStyle> getSupportedInvocationStyle(PartnerRoleChannel prc, EndpointReference partnerEpr) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+ public void invokePartnerAsynch(PartnerRoleMessageExchange mex) throws ContextException {
+ // TODO Auto-generated method stub
+
+ }
+ public void invokePartnerBlocking(PartnerRoleMessageExchange mex) throws ContextException {
+ // TODO Auto-generated method stub
+
+ }
+ public void invokePartnerReliable(PartnerRoleMessageExchange mex) throws ContextException {
+ // TODO Auto-generated method stub
+
+ }
+ public void invokePartnerTransacted(PartnerRoleMessageExchange mex) throws ContextException {
+ // TODO Auto-generated method stub
+
+ }
+ public void onReliableReply(MyRoleMessageExchange myRoleMex) throws BpelEngineException {
+ // TODO Auto-generated method stub
+
+ }
};
return _mexContext;
}
@@ -267,9 +294,7 @@
long _nextSchedule;
SchedulerWrapper(BpelServerImpl server, TransactionManager txManager, DataSource dataSource) {
- ExecutorService executorService = Executors.newCachedThreadPool();
_scheduler = new MockScheduler(_txManager);
- _scheduler.setExecutorSvc(executorService);
_scheduler.setJobProcessor(server);
}
@@ -279,39 +304,24 @@
return jobId;
}
- public String scheduleVolatileJob(boolean transacted, Map<String,Object> jobDetail) throws ContextException {
- String jobId = _scheduler.scheduleVolatileJob(transacted, jobDetail);
- _nextSchedule = System.currentTimeMillis();
- return jobId;
- }
-
public void cancelJob(String jobId) throws ContextException {
_scheduler.cancelJob(jobId);
}
- public <T> T execTransaction(Callable<T> transaction) throws Exception, ContextException {
- return _scheduler.execTransaction(transaction);
- }
-
- public <T> Future<T> execIsolatedTransaction(Callable<T> transaction) throws Exception, ContextException {
- return _scheduler.execIsolatedTransaction(transaction);
- }
-
- public boolean isTransacted() {
- return _scheduler.isTransacted();
- }
public void start() { _scheduler.start(); }
public void stop() { _scheduler.stop(); }
public void shutdown() { _scheduler.shutdown(); }
- public void registerSynchronizer(Synchronizer synch) throws ContextException {
- _scheduler.registerSynchronizer(synch);
- }
public void setJobProcessor(JobProcessor processor) throws ContextException {
_scheduler.setJobProcessor(processor);
+ }
+
+ public void jobCompleted(String jobId) {
+ _scheduler.jobCompleted(jobId);
+
}
}