You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by ms...@apache.org on 2007/09/06 07:46:57 UTC
svn commit: r573153 [4/9] - in /ode/trunk: ./
axis2/src/main/java/org/apache/ode/axis2/ bpel-api/src/
bpel-api/src/main/java/org/apache/ode/bpel/explang/
bpel-api/src/main/java/org/apache/ode/bpel/iapi/
bpel-api/src/main/java/org/apache/ode/bpel/pmapi/...
Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java Wed Sep 5 22:46:42 2007
@@ -18,6 +18,16 @@
*/
package org.apache.ode.bpel.engine;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+
+import javax.wsdl.Operation;
+import javax.xml.namespace.QName;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ode.bpel.common.CorrelationKey;
@@ -27,7 +37,6 @@
import org.apache.ode.bpel.dao.CorrelatorDAO;
import org.apache.ode.bpel.dao.MessageDAO;
import org.apache.ode.bpel.dao.MessageExchangeDAO;
-import org.apache.ode.bpel.dao.MessageRouteDAO;
import org.apache.ode.bpel.dao.PartnerLinkDAO;
import org.apache.ode.bpel.dao.ProcessDAO;
import org.apache.ode.bpel.dao.ProcessInstanceDAO;
@@ -36,17 +45,20 @@
import org.apache.ode.bpel.evt.*;
import org.apache.ode.bpel.iapi.BpelEngineException;
import org.apache.ode.bpel.iapi.ContextException;
-import org.apache.ode.bpel.iapi.Endpoint;
import org.apache.ode.bpel.iapi.EndpointReference;
-import org.apache.ode.bpel.iapi.Message;
+import org.apache.ode.bpel.iapi.InvocationStyle;
import org.apache.ode.bpel.iapi.MessageExchange;
+import org.apache.ode.bpel.iapi.PartnerRoleChannel;
+import org.apache.ode.bpel.iapi.MessageExchange.AckType;
import org.apache.ode.bpel.iapi.MessageExchange.FailureType;
import org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern;
-import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
-import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
-import org.apache.ode.bpel.memdao.ProcessInstanceDaoImpl;
+import org.apache.ode.bpel.iapi.MessageExchange.Status;
+import org.apache.ode.bpel.o.OElementVarType;
+import org.apache.ode.bpel.o.OMessageVarType;
+import org.apache.ode.bpel.o.OPartnerLink;
+import org.apache.ode.bpel.o.OProcess;
+import org.apache.ode.bpel.o.OScope;
import org.apache.ode.bpel.o.OMessageVarType.Part;
-import org.apache.ode.bpel.o.*;
import org.apache.ode.bpel.runtime.BpelJacobRunnable;
import org.apache.ode.bpel.runtime.BpelRuntimeContext;
import org.apache.ode.bpel.runtime.CorrelationSetInstance;
@@ -71,15 +83,11 @@
import org.w3c.dom.Node;
import org.w3c.dom.Document;
-import javax.wsdl.Operation;
-import javax.xml.namespace.QName;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Date;
-import java.util.List;
-
+/**
+ *
+ *
+ * @author Maciej Szefler
+ */
class BpelRuntimeContextImpl implements BpelRuntimeContext {
private static final Log __log = LogFactory.getLog(BpelRuntimeContextImpl.class);
@@ -96,58 +104,79 @@
/** JACOB ExecutionQueue (state) */
protected ExecutionQueueImpl _soup;
- private MyRoleMessageExchangeImpl _instantiatingMessageExchange;
+ private MessageExchangeDAO _instantiatingMessageExchange;
- private OutstandingRequestManager _outstandingRequests;
+ private BpelInstanceWorker _instanceWorker;
private BpelProcess _bpelProcess;
/** Five second maximum for continous execution. */
private long _maxReductionTimeMs = 2000000;
- public BpelRuntimeContextImpl(BpelProcess bpelProcess, ProcessInstanceDAO dao, PROCESS PROCESS,
- MyRoleMessageExchangeImpl instantiatingMessageExchange) {
- _bpelProcess = bpelProcess;
- _dao = dao;
- _iid = dao.getInstanceId();
- _instantiatingMessageExchange = instantiatingMessageExchange;
- _vpu = new JacobVPU();
- _vpu.registerExtension(BpelRuntimeContext.class, this);
+ private Contexts _contexts;
- _soup = new ExecutionQueueImpl(null);
- _soup.setReplacementMap(_bpelProcess.getReplacementMap());
- _outstandingRequests = new OutstandingRequestManager();
- _vpu.setContext(_soup);
+ private boolean _executed;
- if (bpelProcess.isInMemory()) {
- ProcessInstanceDaoImpl inmem = (ProcessInstanceDaoImpl) _dao;
- if (inmem.getSoup() != null) {
- _soup = (ExecutionQueueImpl) inmem.getSoup();
- _outstandingRequests = (OutstandingRequestManager) _soup.getGlobalData();
- _vpu.setContext(_soup);
- }
+ BpelRuntimeContextImpl(BpelInstanceWorker instanceWorker, ProcessInstanceDAO dao) {
+ this(instanceWorker, dao, new ExecutionQueueImpl(null));
+
+ // The following allows us to skip deserialization of the soup if our execution state in memory is the same
+ // as that in the database.
+ Object cachedState = instanceWorker.getCachedState(dao.getExecutionStateCounter());
+ if (cachedState != null) {
+ if (__log.isDebugEnabled())
+ __log.debug("CACHE HIT: Using cached state #" + dao.getExecutionStateCounter() + " to resume instance " + dao.getInstanceId());
+ _soup = (ExecutionQueueImpl) cachedState;
+ _soup.setReplacementMap(_bpelProcess.getReplacementMap());
+ _vpu.setContext(_soup);
} else {
+ if (__log.isDebugEnabled())
+ __log.debug("CACHE MISS: Loading state to resume instance " + dao.getInstanceId() + " from database ");
byte[] daoState = dao.getExecutionState();
- if (daoState != null) {
- ByteArrayInputStream iis = new ByteArrayInputStream(daoState);
- try {
- _soup.read(iis);
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- _outstandingRequests = (OutstandingRequestManager) _soup.getGlobalData();
+ ByteArrayInputStream iis = new ByteArrayInputStream(daoState);
+ try {
+ _soup.read(iis);
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
}
}
+ }
- if (PROCESS != null) {
- _vpu.inject(PROCESS);
- }
+ BpelRuntimeContextImpl(BpelInstanceWorker instanceWorker, ProcessInstanceDAO dao, PROCESS PROCESS,
+ MessageExchangeDAO instantiatingMessageExchange) {
+ this(instanceWorker, dao, new ExecutionQueueImpl(null));
+
+ if (PROCESS == null)
+ throw new NullPointerException();
+ if (instantiatingMessageExchange == null)
+ throw new NullPointerException();
+ _soup.setGlobalData(new OutstandingRequestManager());
+ _instantiatingMessageExchange = instantiatingMessageExchange;
+ _vpu.inject(PROCESS);
+
+ }
+
+ BpelRuntimeContextImpl(BpelInstanceWorker instanceWorker, ProcessInstanceDAO dao, ExecutionQueueImpl soup) {
+ _instanceWorker = instanceWorker;
+ _bpelProcess = instanceWorker._process;
+ _contexts = instanceWorker._contexts;
+ _dao = dao;
+ _iid = dao.getInstanceId();
+ _vpu = new JacobVPU();
+ _vpu.registerExtension(BpelRuntimeContext.class, this);
+ _soup = soup;
+ _soup.setReplacementMap(_bpelProcess.getReplacementMap());
+ _vpu.setContext(_soup);
if (BpelProcess.__log.isDebugEnabled()) {
__log.debug("BpelRuntimeContextImpl created for instance " + _iid + ". INDEXED STATE=" + _soup.getIndex());
}
}
+ public String toString() {
+ return "{BpelRuntimeCtx PID=" + _bpelProcess.getPID() + ", IID=" + _iid + "}";
+ }
+
public Long getPid() {
return _iid;
}
@@ -189,8 +218,8 @@
BpelProcess.__log.debug("ProcessImpl completed with fault '" + faultData.getFaultName() + "'");
}
- _dao.setFault(faultData.getFaultName(), faultData.getExplanation(), faultData.getFaultLineNo(), faultData
- .getActivityId(), faultData.getFaultMessage());
+ _dao.setFault(faultData.getFaultName(), faultData.getExplanation(), faultData.getFaultLineNo(), faultData.getActivityId(),
+ faultData.getFaultMessage());
// send event
ProcessInstanceStateChangeEvent evt = new ProcessInstanceStateChangeEvent();
@@ -202,7 +231,7 @@
sendEvent(new ProcessCompletionEvent(faultData.getFaultName()));
_dao.finishCompletion();
- faultOutstandingMessageExchanges(faultData);
+ cleanupOutstandingMyRoleExchanges(faultData);
}
/**
@@ -223,17 +252,16 @@
sendEvent(new ProcessCompletionEvent(null));
_dao.finishCompletion();
- completeOutstandingMessageExchanges();
+ cleanupOutstandingMyRoleExchanges();
}
/**
- * @see BpelRuntimeContext#createScopeInstance(Long,
- * org.apache.ode.bpel.o.OScope)
+ * @see BpelRuntimeContext#createScopeInstance(Long, org.apache.ode.bpel.o.OScope)
*/
public Long createScopeInstance(Long parentScopeId, OScope scope) {
if (BpelProcess.__log.isTraceEnabled()) {
- BpelProcess.__log.trace(ObjectPrinter.stringifyMethodEnter("createScopeInstance", new Object[] {
- "parentScopeId", parentScopeId, "scope", scope }));
+ BpelProcess.__log.trace(ObjectPrinter.stringifyMethodEnter("createScopeInstance", new Object[] { "parentScopeId",
+ parentScopeId, "scope", scope }));
}
ScopeDAO parent = null;
@@ -255,8 +283,8 @@
ScopeDAO parent = _dao.getScope(parentScopeId);
for (OPartnerLink partnerLink : partnerLinks) {
- PartnerLinkDAO pdao = parent.createPartnerLink(partnerLink.getId(), partnerLink.name,
- partnerLink.myRoleName, partnerLink.partnerRoleName);
+ PartnerLinkDAO pdao = parent.createPartnerLink(partnerLink.getId(), partnerLink.name, partnerLink.myRoleName,
+ partnerLink.partnerRoleName);
// If there is a myrole on the link, initialize the session id so it
// is always
// available for opaque correlations. The myrole session id should
@@ -266,12 +294,11 @@
}
}
- public void select(PickResponseChannel pickResponseChannel, Date timeout, boolean createInstance,
- Selector[] selectors) throws FaultException {
+ public void select(PickResponseChannel pickResponseChannel, Date timeout, boolean createInstance, Selector[] selectors)
+ throws FaultException {
if (BpelProcess.__log.isTraceEnabled())
BpelProcess.__log.trace(ObjectPrinter.stringifyMethodEnter("select", new Object[] { "pickResponseChannel",
- pickResponseChannel, "timeout", timeout, "createInstance", createInstance,
- "selectors", selectors }));
+ pickResponseChannel, "timeout", timeout, "createInstance", createInstance, "selectors", selectors }));
ProcessDAO processDao = _dao.getProcess();
@@ -297,12 +324,11 @@
correlators.add(processDao.getCorrelator(correlatorId));
}
- int conflict = _outstandingRequests.findConflict(selectors);
+ int conflict = getORM().findConflict(selectors);
if (conflict != -1)
- throw new FaultException(_bpelProcess.getOProcess().constants.qnConflictingReceive, selectors[conflict]
- .toString());
+ throw new FaultException(_bpelProcess.getOProcess().constants.qnConflictingReceive, selectors[conflict].toString());
- _outstandingRequests.register(pickResponseChannelStr, selectors);
+ getORM().register(pickResponseChannelStr, selectors);
// TODO - ODE-58
@@ -314,48 +340,16 @@
for (int i = 0; i < correlators.size(); ++i) {
CorrelatorDAO ci = correlators.get(i);
if (ci.equals(_dao.getInstantiatingCorrelator())) {
- inputMsgMatch(pickResponseChannelStr, i, _instantiatingMessageExchange);
+ injectMyRoleMessageExchange(pickResponseChannelStr, i, _instantiatingMessageExchange);
if (BpelProcess.__log.isDebugEnabled()) {
- BpelProcess.__log.debug("SELECT: " + pickResponseChannel
- + ": FOUND match for NEW instance mexRef=" + _instantiatingMessageExchange);
+ BpelProcess.__log.debug("SELECT: " + pickResponseChannel + ": FOUND match for NEW instance mexRef="
+ + _instantiatingMessageExchange);
}
return;
}
}
}
- // if (BpelProcess.__log.isDebugEnabled()) {
- // BpelProcess.__log.debug("SELECT: " + pickResponseChannel
- // + ": NEW instance match NOT FOUND; CHECKING MESSAGES. ");
- // }
- //
- //
- // for (int i = 0; i < selectors.length; ++i) {
- // CorrelatorDAO correlator = correlators.get(i);
- // Selector selector = selectors[i];
- // MessageExchangeDAO mexdao = correlator
- // .dequeueMessage(selector.correlationKey);
- // if (mexdao != null) {
- // // Found message matching one of our selectors.
- // if (BpelProcess.__log.isDebugEnabled()) {
- // BpelProcess.__log.debug("SELECT: " + pickResponseChannel
- // + ": FOUND match to MESSAGE " + mexdao + " on CKEY "
- // + selector.correlationKey);
- // }
- //
- // MyRoleMessageExchangeImpl mex = new MyRoleMessageExchangeImpl(
- // _bpelProcess._engine, mexdao);
- //
- // inputMsgMatch(pickResponseChannel.export(), i, mex);
- // return;
- // }
- // }
- //
- // if (BpelProcess.__log.isDebugEnabled()) {
- // BpelProcess.__log.debug("SELECT: " + pickResponseChannel
- // + ": MESSAGE match NOT FOUND.");
- // }
-
if (timeout != null) {
registerTimer(pickResponseChannel, timeout);
if (BpelProcess.__log.isDebugEnabled()) {
@@ -376,7 +370,6 @@
}
}
-
}
/**
@@ -404,15 +397,14 @@
XmlDataDAO dataDAO = scopeDAO.getVariable(variable.declaration.name);
if (dataDAO.isNull()) {
- throw new FaultException(_bpelProcess.getOProcess().constants.qnUninitializedVariable,
- "The variable " + variable.declaration.name + " isn't properly initialized.");
+ throw new FaultException(_bpelProcess.getOProcess().constants.qnUninitializedVariable, "The variable "
+ + variable.declaration.name + " isn't properly initialized.");
}
return dataDAO.get();
}
- public Node fetchVariableData(VariableInstance var, OMessageVarType.Part part, boolean forWriting)
- throws FaultException {
+ public Node fetchVariableData(VariableInstance var, OMessageVarType.Part part, boolean forWriting) throws FaultException {
Node container = fetchVariableData(var, forWriting);
// If we want a specific part, we will need to navigate through the
@@ -450,9 +442,9 @@
}
/**
- * Evaluate a property alias query expression against a variable, returning
- * the normalized {@link String} representation of the property value.
- *
+ * Evaluate a property alias query expression against a variable, returning the normalized {@link String} representation of the
+ * property value.
+ *
* @param variable
* variable to read
* @param property
@@ -468,8 +460,7 @@
String val = _bpelProcess.extractProperty((Element) varData, alias, variable.declaration.getDescription());
if (BpelProcess.__log.isTraceEnabled()) {
- BpelProcess.__log.trace("readPropertyAlias(variable=" + variable + ", alias=" + alias + ") = "
- + val.toString());
+ BpelProcess.__log.trace("readPropertyAlias(variable=" + variable + ", alias=" + alias + ") = " + val.toString());
}
return val;
@@ -507,7 +498,7 @@
// We have an element
nodeQName = new QName(targetNode.getNamespaceURI(), targetNode.getLocalName());
}
- return _bpelProcess._engine._contexts.eprContext.convertEndpoint(nodeQName, sourceNode).toXML();
+ return _contexts.eprContext.convertEndpoint(nodeQName, sourceNode).toXML();
}
public void commitChanges(VariableInstance variable, Node changes) {
@@ -518,9 +509,9 @@
writeProperties(variable, changes, dataDAO);
}
- public void reply(final PartnerLinkInstance plinkInstnace, final String opName, final String mexId, Element msg,
- QName fault) throws FaultException {
- String mexRef = _outstandingRequests.release(plinkInstnace, opName, mexId);
+ public void reply(final PartnerLinkInstance plinkInstnace, final String opName, final String mexId, Element msg, QName fault)
+ throws FaultException {
+ String mexRef = getORM().release(plinkInstnace, opName, mexId);
if (mexRef == null) {
throw new FaultException(_bpelProcess.getOProcess().constants.qnMissingRequest);
@@ -532,61 +523,45 @@
evt.setOperation(opName);
evt.setPortType(plinkInstnace.partnerLink.myRolePortType.getQName());
- MessageExchangeDAO mex = _dao.getConnection().getMessageExchange(mexRef);
+ // Get the "my-role" mex from the DB.
+ MessageExchangeDAO myrolemex = _dao.getConnection().getMessageExchange(mexRef);
- MessageDAO message = mex.createMessage(plinkInstnace.partnerLink.getMyRoleOperation(opName).getOutput()
- .getMessage().getQName());
+ Operation operation = plinkInstnace.partnerLink.getMyRoleOperation(opName);
+ if (operation == null || operation.getOutput() == null) {
+ // reply to operation that is either not defined or one-way
+ // Perhaps this should be detected at compile time?
+ throw new FaultException(_bpelProcess.getOProcess().constants.qnMissingRequest,
+ "Undefined two-way operation \"" + opName + "\".");
+
+ }
+
+ // TODO what if msg==null? i.e. for a reply-with-fault.
+
+ MessageDAO message = myrolemex.createMessage(
+ operation.getOutput().getMessage()
+ .getQName());
message.setData(msg);
- MyRoleMessageExchangeImpl m = new MyRoleMessageExchangeImpl(_bpelProcess._engine, mex);
- _bpelProcess.initMyRoleMex(m);
- m.setResponse(new MessageImpl(message));
+ myrolemex.setResponse(message);
+ AckType ackType;
if (fault != null) {
- mex.setStatus(MessageExchange.Status.FAULT.toString());
- mex.setFault(fault);
+ ackType = AckType.FAULT;
+ myrolemex.setFault(fault);
evt.setAspect(ProcessMessageExchangeEvent.PROCESS_FAULT);
} else {
- mex.setStatus(MessageExchange.Status.RESPONSE.toString());
+ ackType = AckType.RESPONSE;
evt.setAspect(ProcessMessageExchangeEvent.PROCESS_OUTPUT);
}
- if (mex.getPipedMessageExchangeId() != null) {
- PartnerRoleMessageExchange pmex = (PartnerRoleMessageExchange) _bpelProcess
- .getEngine().getMessageExchange(mex.getPipedMessageExchangeId());
- if (BpelProcess.__log.isDebugEnabled()) {
- __log.debug("Replying to a p2p mex, myrole " + m + " - partnerole " + pmex);
- }
- try {
- switch (m.getStatus()) {
- case FAILURE:
- // We can't seem to get the failure out of the myrole mex?
- pmex.replyWithFailure(MessageExchange.FailureType.OTHER, "operation failed", null);
- break;
- case FAULT:
- Message faultRes = pmex.createMessage(pmex.getOperation().getFault(m.getFault().getLocalPart())
- .getMessage().getQName());
- faultRes.setMessage(m.getResponse().getMessage());
- pmex.replyWithFault(m.getFault(), faultRes);
- break;
- case RESPONSE:
- Message response = pmex.createMessage(pmex.getOperation().getOutput().getMessage().getQName());
- response.setMessage(m.getResponse().getMessage());
- pmex.reply(response);
- break;
- default:
- __log.warn("Unexpected state: " + m.getStatus());
- break;
- }
- } finally {
- mex.release();
- }
- } else _bpelProcess._engine._contexts.mexContext.onAsyncReply(m);
-
- // send event
+ Status previousStatus = myrolemex.getStatus();
+ myrolemex.setStatus(Status.ACK);
+ myrolemex.setAckType(ackType);
+ _bpelProcess.onMyRoleMexAck(myrolemex, previousStatus);
sendEvent(evt);
}
+
/**
* @see BpelRuntimeContext#writeCorrelation(org.apache.ode.bpel.runtime.CorrelationSetInstance,
* org.apache.ode.bpel.common.CorrelationKey)
@@ -594,8 +569,7 @@
public void writeCorrelation(CorrelationSetInstance cset, CorrelationKey correlation) {
ScopeDAO scopeDAO = _dao.getScope(cset.scopeInstance);
CorrelationSetDAO cs = scopeDAO.getCorrelationSet(cset.declaration.name);
- OScope.CorrelationSet csetdef = (OScope.CorrelationSet) _bpelProcess.getOProcess()
- .getChild(correlation.getCSetId());
+ OScope.CorrelationSet csetdef = (OScope.CorrelationSet) _bpelProcess.getOProcess().getChild(correlation.getCSetId());
QName[] propNames = new QName[csetdef.properties.size()];
for (int m = 0; m < csetdef.properties.size(); m++) {
OProcess.OProperty oProperty = csetdef.properties.get(m);
@@ -610,14 +584,13 @@
}
/**
- * Common functionality to initialize a correlation set based on data
- * available in a variable.
- *
+ * Common functionality to initialize a correlation set based on data available in a variable.
+ *
* @param cset
* the correlation set instance
* @param variable
* variable instance
- *
+ *
* @throws IllegalStateException
* DOCUMENTME
*/
@@ -663,64 +636,59 @@
sendEvent(evt);
sendEvent(new ProcessTerminationEvent());
- failOutstandingMessageExchanges();
+ cleanupOutstandingMyRoleExchanges();
}
public void registerTimer(TimerResponseChannel timerChannel, Date timeToFire) {
WorkEvent we = new WorkEvent();
we.setIID(_dao.getInstanceId());
+ we.setProcessId(_bpelProcess.getPID());
we.setChannel(timerChannel.export());
we.setType(WorkEvent.Type.TIMER);
- we.setInMem(_bpelProcess.isInMemory());
- _bpelProcess._engine._contexts.scheduler.schedulePersistedJob(we.getDetail(), timeToFire);
+ _bpelProcess.scheduleWorkEvent(we, timeToFire);
}
private void scheduleCorrelatorMatcher(String correlatorId, CorrelationKey key) {
+
WorkEvent we = new WorkEvent();
we.setIID(_dao.getInstanceId());
+ we.setProcessId(_bpelProcess.getPID());
we.setType(WorkEvent.Type.MATCHER);
we.setCorrelatorId(correlatorId);
we.setCorrelationKey(key);
- we.setInMem(_bpelProcess.isInMemory());
- _bpelProcess._engine._contexts.scheduler.scheduleVolatileJob(true, we.getDetail());
+ _bpelProcess.scheduleWorkEvent(we, null);
}
public String invoke(PartnerLinkInstance partnerLink, Operation operation, Element outgoingMessage,
- InvokeResponseChannel channel) throws FaultException {
+ InvokeResponseChannel channel) throws FaultException {
+
+ // TODO: move a lot of this into BpelProcess
+
+ // TODO: think we should move the dao creation into bpelprocess --mbs
+ MessageExchangeDAO mexDao = _dao.getConnection().createMessageExchange(new GUID().toString(),
+ MessageExchangeDAO.DIR_BPEL_INVOKES_PARTNERROLE);
+ mexDao.setStatus(MessageExchange.Status.NEW);
+ mexDao.setOperation(operation.getName());
+ mexDao.setPortType(partnerLink.partnerLink.partnerRolePortType.getQName());
+ mexDao.setPartnerLinkModelId(partnerLink.partnerLink.getId());
+ PartnerRoleChannel partnerRoleChannel = _bpelProcess.getPartnerRoleChannel(partnerLink.partnerLink);
PartnerLinkDAO plinkDAO = fetchPartnerLinkDAO(partnerLink);
- // The target (partner endpoint) -- if it has not been explicitly
- // initialized
- // then use the value from bthe deployment descriptor ..
+
Element partnerEPR = plinkDAO.getPartnerEPR();
+
EndpointReference partnerEpr;
if (partnerEPR == null) {
- partnerEpr = _bpelProcess.getInitialPartnerRoleEPR(partnerLink.partnerLink);
+ partnerEpr = partnerRoleChannel.getInitialEndpointReference();
// In this case, the partner link has not been initialized.
if (partnerEpr == null)
throw new FaultException(partnerLink.partnerLink.getOwner().constants.qnUninitializedPartnerRole);
} else {
- partnerEpr = _bpelProcess._engine._contexts.eprContext.resolveEndpointReference(partnerEPR);
- }
-
- if (BpelProcess.__log.isDebugEnabled()) {
- BpelProcess.__log.debug("INVOKING PARTNER: partnerLink=" + partnerLink +
- ", op=" + operation.getName() + " channel=" + channel + ")");
+ partnerEpr = _contexts.eprContext.resolveEndpointReference(partnerEPR);
}
-
- // prepare event
- ProcessMessageExchangeEvent evt = new ProcessMessageExchangeEvent();
- evt.setOperation(operation.getName());
- evt.setPortType(partnerLink.partnerLink.partnerRolePortType.getQName());
- evt.setAspect(ProcessMessageExchangeEvent.PARTNER_INPUT);
-
- MessageExchangeDAO mexDao = _dao.getConnection().createMessageExchange(
- MessageExchangeDAO.DIR_BPEL_INVOKES_PARTNERROLE);
- mexDao.setStatus(MessageExchange.Status.NEW.toString());
- mexDao.setOperation(operation.getName());
- mexDao.setPortType(partnerLink.partnerLink.partnerRolePortType.getQName());
- mexDao.setPartnerLinkModelId(partnerLink.partnerLink.getId());
+
+ mexDao.setEPR(partnerEpr.toXML().getDocumentElement());
mexDao.setPartnerLink(plinkDAO);
mexDao.setProcess(_dao.getProcess());
mexDao.setInstance(_dao);
@@ -728,145 +696,75 @@
: MessageExchangePattern.REQUEST_ONLY).toString());
mexDao.setChannel(channel == null ? null : channel.export());
- // Properties used by stateful-exchange protocol.
- String mySessionId = plinkDAO.getMySessionId();
- String partnerSessionId = plinkDAO.getPartnerSessionId();
-
- if ( mySessionId != null )
- mexDao.setProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID, mySessionId);
- if ( partnerSessionId != null )
- mexDao.setProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID, partnerSessionId);
-
- if (__log.isDebugEnabled())
- __log.debug("INVOKE PARTNER (SEP): sessionId=" + mySessionId + " partnerSessionId=" + partnerSessionId);
-
MessageDAO message = mexDao.createMessage(operation.getInput().getMessage().getQName());
mexDao.setRequest(message);
+ mexDao.setTimeout(30000);
message.setData(outgoingMessage);
message.setType(operation.getInput().getMessage().getQName());
- // Get he my-role EPR (if myrole exists) for optional use by partner
- // (for callback mechanism).
- EndpointReference myRoleEndpoint = partnerLink.partnerLink.hasMyRole() ? _bpelProcess
- .getInitialMyRoleEPR(partnerLink.partnerLink) : null;
- PartnerRoleMessageExchangeImpl mex = new PartnerRoleMessageExchangeImpl(_bpelProcess._engine, mexDao,
- partnerLink.partnerLink.partnerRolePortType, operation, partnerEpr, myRoleEndpoint, _bpelProcess
- .getPartnerRoleChannel(partnerLink.partnerLink));
-
- BpelProcess p2pProcess = null;
- Endpoint partnerEndpoint = _bpelProcess.getInitialPartnerRoleEndpoint(partnerLink.partnerLink);
- if (partnerEndpoint != null)
- p2pProcess = _bpelProcess.getEngine().route(partnerEndpoint.serviceName, mex.getRequest());
-
- if (p2pProcess != null) {
- // Creating a my mex using the same message id as partner mex to "pipe" them
- MyRoleMessageExchange myRoleMex = _bpelProcess.getEngine().createMessageExchange(
- mex.getMessageExchangeId(), partnerEndpoint.serviceName,
- operation.getName(), mex.getMessageExchangeId());
-
- if (BpelProcess.__log.isDebugEnabled()) {
- __log.debug("Invoking in a p2p interaction, partnerrole " + mex + " - myrole " + myRoleMex);
- }
-
- Message odeRequest = myRoleMex.createMessage(operation.getInput().getMessage().getQName());
- odeRequest.setMessage(outgoingMessage);
-
- if (BpelProcess.__log.isDebugEnabled()) {
- __log.debug("Setting myRoleMex session ids for p2p interaction, mySession "
- + partnerSessionId + " - partnerSess " + mySessionId);
- }
- if ( partnerSessionId != null )
- myRoleMex.setProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID, partnerSessionId);
- if ( mySessionId != null )
- myRoleMex.setProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID, mySessionId);
+ // prepare event
+ ProcessMessageExchangeEvent evt = new ProcessMessageExchangeEvent();
+ evt.setOperation(operation.getName());
+ evt.setPortType(partnerLink.partnerLink.partnerRolePortType.getQName());
+ evt.setAspect(ProcessMessageExchangeEvent.PARTNER_INPUT);
+ evt.setMexId(mexDao.getMessageExchangeId());
+ sendEvent(evt);
- mex.setStatus(MessageExchange.Status.REQUEST);
- myRoleMex.invoke(odeRequest);
- // Can't expect any sync response
- mex.replyAsync();
- } else {
- // If we couldn't find the endpoint, then there is no sense
- // in asking the IL to invoke.
- if (partnerEpr != null) {
- mexDao.setEPR(partnerEpr.toXML().getDocumentElement());
- mex.setStatus(MessageExchange.Status.REQUEST);
- _bpelProcess._engine._contexts.mexContext.invokePartner(mex);
- } else {
- __log.error("Couldn't find endpoint for partner EPR " + DOMUtils.domToString(partnerEPR));
- mex.setFailure(FailureType.UNKNOWN_ENDPOINT, "UnknownEndpoint", partnerEPR);
- }
+ if (__log.isDebugEnabled()) {
+ __log.debug("INVOKING PARTNER: partnerLink=" + partnerLink + ", op=" + operation.getName() + " channel="
+ + channel + ")");
}
- evt.setMexId(mexDao.getMessageExchangeId());
- sendEvent(evt);
+ _bpelProcess.invokePartner(mexDao);
+
- // MEX pattern is request only, at this point the status can only be a one way
- if (mexDao.getPattern().equals(MessageExchangePattern.REQUEST_ONLY.toString())) {
- mexDao.setStatus(MessageExchange.Status.ASYNC.toString());
- // This mex can now be released
- mexDao.release();
- }
- // Check if there is a synchronous response, if so, we need to inject the
- // message on the response channel.
- switch (mex.getStatus()) {
- case NEW:
- throw new AssertionError("Impossible!");
- case ASYNC:
- break;
- case RESPONSE:
- case FAULT:
- case FAILURE:
- invocationResponse(mex);
- break;
- default:
- __log.error("Partner did not acknowledge message exchange: " + mex);
- mex.setFailure(FailureType.NO_RESPONSE, "Partner did not acknowledge.", null);
- invocationResponse(mex);
+ // In case a response/fault was available right away, which will happen for BLOCKING/TRANSACTED invocations,
+ // we need to inject a message on the response channel, so that the process continues.
+ switch (mexDao.getStatus()) {
+ case ACK:
+ injectPartnerResponse(mexDao.getMessageExchangeId(), mexDao.getChannel());
+ break;
+ case ASYNC:
+ // we'll have to wait for the response.
+ break;
+ default:
+ throw new AssertionError("Unexpected MEX status: " + mexDao.getStatus());
}
-
+
return mexDao.getMessageExchangeId();
}
void execute() {
+ if (!_contexts.isTransacted())
+ throw new BpelEngineException("MUST RUN IN TRANSACTION!");
+ if (_executed)
+ throw new IllegalStateException("cannot call execute() twice!");
+
long maxTime = System.currentTimeMillis() + _maxReductionTimeMs;
+
+ // Execute the process state reductions
boolean canReduce = true;
while (ProcessState.canExecute(_dao.getState()) && System.currentTimeMillis() < maxTime && canReduce) {
canReduce = _vpu.execute();
}
+
_dao.setLastActiveTime(new Date());
if (!ProcessState.isFinished(_dao.getState())) {
- if (__log.isDebugEnabled()) __log.debug("Setting execution state on instance " + _iid);
- _soup.setGlobalData(_outstandingRequests);
-
- if (_bpelProcess.isInMemory()) {
- // don't serialize in-memory processes
- ((ProcessInstanceDaoImpl) _dao).setSoup(_soup);
- } else {
- ByteArrayOutputStream bos = new ByteArrayOutputStream(10000);
- try {
- _soup.write(bos);
- bos.close();
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- _dao.setExecutionState(bos.toByteArray());
- }
+ saveState();
if (ProcessState.canExecute(_dao.getState()) && canReduce) {
// Max time exceeded (possibly an infinite loop).
if (__log.isDebugEnabled())
__log.debug("MaxTime exceeded for instance # " + _iid);
+
try {
WorkEvent we = new WorkEvent();
we.setIID(_iid);
+ we.setProcessId(_bpelProcess.getPID());
we.setType(WorkEvent.Type.RESUME);
- we.setInMem(_bpelProcess.isInMemory());
- if (_bpelProcess.isInMemory())
- _bpelProcess._engine._contexts.scheduler.scheduleVolatileJob(true, we.getDetail());
- else
- _bpelProcess._engine._contexts.scheduler.schedulePersistedJob(we.getDetail(), new Date());
+ _contexts.scheduler.schedulePersistedJob(we.getDetail(), new Date());
} catch (ContextException e) {
__log.error("Failed to schedule resume task.", e);
throw new BpelEngineException(e);
@@ -875,7 +773,24 @@
}
}
- void inputMsgMatch(final String responsechannel, final int idx, MyRoleMessageExchangeImpl mex) {
+ private void saveState() {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream(10000);
+ try {
+ _soup.write(bos);
+ bos.close();
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+
+ int newcount = _dao.getExecutionStateCounter() + 1;
+ _dao.setExecutionStateCounter(newcount);
+ _dao.setExecutionState(bos.toByteArray());
+ _instanceWorker.setCachedState(newcount, _soup);
+
+ __log.debug("CACHE SAVE: #" + newcount + " for instance " + _dao.getInstanceId());
+ }
+
+ void injectMyRoleMessageExchange(final String responsechannel, final int idx, MessageExchangeDAO mexdao) {
// if we have a message match, this instance should be marked
// active if it isn't already
if (_dao.getState() == ProcessState.STATE_READY) {
@@ -892,9 +807,9 @@
sendEvent(evt);
}
- _outstandingRequests.associate(responsechannel, mex.getMessageExchangeId());
+ getORM().associate(responsechannel, mexdao.getMessageExchangeId());
- final String mexId = mex.getMessageExchangeId();
+ final String mexId = mexdao.getMessageExchangeId();
_vpu.inject(new JacobRunnable() {
private static final long serialVersionUID = 3168964409165899533L;
@@ -905,15 +820,15 @@
});
}
- void timerEvent(final String timerResponseChannel) {
+ boolean injectTimerEvent(final String timerResponseChannel) {
// In case this is a pick event, we remove routes,
// and cancel the outstanding requests.
_dao.getProcess().removeRoutes(timerResponseChannel, _dao);
- _outstandingRequests.cancel(timerResponseChannel);
+ getORM().cancel(timerResponseChannel);
// Ignore timer events after the process is finished.
if (ProcessState.isFinished(_dao.getState())) {
- return;
+ return false;
}
_vpu.inject(new JacobRunnable() {
@@ -924,7 +839,8 @@
responseChannel.onTimeout();
}
});
- execute();
+
+ return true;
}
public void cancel(final TimerResponseChannel timerResponseChannel) {
@@ -932,7 +848,7 @@
// receive/reply association.
final String id = timerResponseChannel.export();
_dao.getProcess().removeRoutes(id, _dao);
- _outstandingRequests.cancel(id);
+ getORM().cancel(id);
_vpu.inject(new JacobRunnable() {
private static final long serialVersionUID = 6157913683737696396L;
@@ -944,32 +860,28 @@
});
}
- void invocationResponse(PartnerRoleMessageExchangeImpl mex) {
- invocationResponse(mex.getDAO().getMessageExchangeId(), mex.getDAO().getChannel());
- }
-
- void invocationResponse(final String mexid, final String responseChannelId) {
+ void injectPartnerResponse(final String mexid, final String responseChannelId) {
if (responseChannelId == null)
throw new NullPointerException("Null responseChannelId");
if (mexid == null)
throw new NullPointerException("Null mexId");
if (BpelProcess.__log.isDebugEnabled()) {
- __log.debug("Invoking message response for mexid " + mexid + " and channel " + responseChannelId);
+ __log.debug("<invoke> response for mexid " + mexid + " and channel " + responseChannelId);
}
_vpu.inject(new BpelJacobRunnable() {
private static final long serialVersionUID = -1095444335740879981L;
public void run() {
- ((BpelRuntimeContextImpl) getBpelRuntimeContext()).invocationResponse2(mexid, importChannel(
- responseChannelId, InvokeResponseChannel.class));
+ ((BpelRuntimeContextImpl) getBpelRuntimeContext()).invocationResponse2(mexid, importChannel(responseChannelId,
+ InvokeResponseChannel.class));
}
});
}
/**
* Continuation of the above.
- *
+ *
* @param mexid
* @param responseChannel
*/
@@ -982,23 +894,23 @@
evt.setMexId(mexid);
evt.setOperation(mex.getOperation());
- MessageExchange.Status status = MessageExchange.Status.valueOf(mex.getStatus());
+ MessageExchange.Status status = mex.getStatus();
- switch (status) {
- case FAULT:
- evt.setAspect(ProcessMessageExchangeEvent.PARTNER_FAULT);
- responseChannel.onFault();
- break;
- case RESPONSE:
- evt.setAspect(ProcessMessageExchangeEvent.PARTNER_OUTPUT);
- responseChannel.onResponse();
- break;
- case FAILURE:
- evt.setAspect(ProcessMessageExchangeEvent.PARTNER_FAILURE);
- responseChannel.onFailure();
- break;
- default:
- __log.error("Invalid response state for mex " + mexid + ": " + status);
+ switch (mex.getAckType()) {
+ case FAULT:
+ evt.setAspect(ProcessMessageExchangeEvent.PARTNER_FAULT);
+ responseChannel.onFault();
+ break;
+ case RESPONSE:
+ evt.setAspect(ProcessMessageExchangeEvent.PARTNER_OUTPUT);
+ responseChannel.onResponse();
+ break;
+ case FAILURE:
+ evt.setAspect(ProcessMessageExchangeEvent.PARTNER_FAILURE);
+ responseChannel.onFailure();
+ break;
+ default:
+ __log.error("Invalid response state for mex " + mexid + ": " + status);
}
sendEvent(evt);
}
@@ -1014,15 +926,14 @@
_bpelProcess._debugger.onEvent(event);
// notify the listeners
- _bpelProcess._engine.fireEvent(event);
+ _bpelProcess._server.fireEvent(event);
// saving
_bpelProcess.saveEvent(event, _dao);
}
/**
- * We record all values of properties of a 'MessageType' variable for
- * efficient lookup.
+ * We record all values of properties of a 'MessageType' variable for efficient lookup.
*/
private void writeProperties(VariableInstance variable, Node value, XmlDataDAO dao) {
if (variable.declaration.type instanceof OMessageVarType) {
@@ -1030,8 +941,7 @@
OProcess.OPropertyAlias alias = property.getAlias(variable.declaration.type);
if (alias != null) {
try {
- String val = _bpelProcess.extractProperty((Element) value, alias, variable.declaration
- .getDescription());
+ String val = _bpelProcess.extractProperty((Element) value, alias, variable.declaration.getDescription());
if (val != null) {
dao.setProperty(property.name.toString(), val);
}
@@ -1039,68 +949,50 @@
// This will fail as we're basically trying to extract properties on all
// received messages for optimization purposes.
if (__log.isDebugEnabled())
- __log.debug("Couldn't extract property '" + property.toString()
- + "' in property pre-extraction: " + e.toString());
+ __log.debug("Couldn't extract property '" + property.toString() + "' in property pre-extraction: "
+ + e.toString());
}
}
}
}
}
- private void completeOutstandingMessageExchanges() {
- String[] mexRefs = _outstandingRequests.releaseAll();
+ /**
+ * Called when the process completes to clean up any outstanding message exchanges.
+ *
+ */
+ private void cleanupOutstandingMyRoleExchanges(FaultData optionalFaultData) {
+ String[] mexRefs = getORM().releaseAll();
for (String mexId : mexRefs) {
MessageExchangeDAO mexDao = _dao.getConnection().getMessageExchange(mexId);
if (mexDao != null) {
- MyRoleMessageExchangeImpl mex = new MyRoleMessageExchangeImpl(_bpelProcess._engine, mexDao);
- switch (mex.getStatus()) {
- case ASYNC:
- case RESPONSE:
- mex.setStatus(MessageExchange.Status.COMPLETED_OK);
- break;
- case REQUEST:
- if (mex.getPattern().equals(MessageExchange.MessageExchangePattern.REQUEST_ONLY)) {
- mex.setStatus(MessageExchange.Status.COMPLETED_OK);
- break;
- }
- default:
- mex.setFailure(FailureType.OTHER, "No response.", null);
- _bpelProcess._engine._contexts.mexContext.onAsyncReply(mex);
- mex.release();
+ Status status = mexDao.getStatus();
+ MessageExchangePattern pattern = MessageExchange.MessageExchangePattern.valueOf(mexDao.getPattern());
+ InvocationStyle istyle = mexDao.getInvocationStyle();
+ if (pattern == MessageExchangePattern.REQUEST_ONLY) {
+ mexDao.setAckType(AckType.ONEWAY);
+ mexDao.setStatus(Status.COMPLETED);
+ continue;
}
+
+ mexDao.setAckType(AckType.FAILURE);
+ mexDao.setFailureType(FailureType.NO_RESPONSE);
+ if (optionalFaultData != null) {
+ mexDao.setFaultExplanation(optionalFaultData.toString());
+ }
+ mexDao.setFaultExplanation("Process completed without responding.");
+ mexDao.setStatus(Status.ACK);
+ _bpelProcess.onMyRoleMexAck(mexDao, status);
}
}
}
- private void faultOutstandingMessageExchanges(FaultData faultData) {
- String[] mexRefs = _outstandingRequests.releaseAll();
- for (String mexId : mexRefs) {
- MessageExchangeDAO mexDao = _dao.getConnection().getMessageExchange(mexId);
- if (mexDao != null) {
- MyRoleMessageExchangeImpl mex = new MyRoleMessageExchangeImpl(_bpelProcess._engine, mexDao);
- _bpelProcess.initMyRoleMex(mex);
-
- Message message = mex.createMessage(faultData.getFaultName());
- if (faultData.getFaultMessage() != null)
- message.setMessage(faultData.getFaultMessage());
- mex.setResponse(message);
-
- mex.setFault(faultData.getFaultName(), message);
- mex.setFaultExplanation(faultData.getExplanation());
- _bpelProcess._engine._contexts.mexContext.onAsyncReply(mex);
- }
- }
+ private OutstandingRequestManager getORM() {
+ return (OutstandingRequestManager) _soup.getGlobalData();
}
- private void failOutstandingMessageExchanges() {
- String[] mexRefs = _outstandingRequests.releaseAll();
- for (String mexId : mexRefs) {
- MessageExchangeDAO mexDao = _dao.getConnection().getMessageExchange(mexId);
- MyRoleMessageExchangeImpl mex = new MyRoleMessageExchangeImpl(_bpelProcess._engine, mexDao);
- _bpelProcess.initMyRoleMex(mex);
- mex.setFailure(FailureType.OTHER, "No response.", null);
- _bpelProcess._engine._contexts.mexContext.onAsyncReply(mex);
- }
+ private void cleanupOutstandingMyRoleExchanges() {
+ cleanupOutstandingMyRoleExchanges(null);
}
public Element getPartnerResponse(String mexId) {
@@ -1123,33 +1015,27 @@
throw new BpelEngineException(msg);
}
- MessageExchange.Status status = MessageExchange.Status.valueOf(dao.getStatus());
- switch (status) {
- case ASYNC:
- case REQUEST:
- MessageDAO request = dao.getRequest();
- if (request == null) {
- // this also should not happen
- String msg = "Engine requested request for message exchange that did not have one: " + mexId;
- __log.fatal(msg);
- throw new BpelEngineException(msg);
- }
-
- return request.getData();
-
- default:
- // We should not be in any other state when requesting this.
- String msg = "Engine requested response while the message exchange " + mexId + " was in the state "
- + status;
- __log.fatal(msg);
- throw new BpelEngineException(msg);
+ MessageDAO request = dao.getRequest();
+ if (request == null) {
+ // this also should not happen
+ String msg = "Engine requested request for message exchange that did not have one: " + mexId;
+ __log.fatal(msg);
+ throw new BpelEngineException(msg);
}
+ return request.getData();
+
}
public QName getPartnerFault(String mexId) {
- MessageExchangeDAO mex = _getPartnerResponse(mexId).getMessageExchange();
- return mex.getFault();
+ MessageExchangeDAO dao = _dao.getConnection().getMessageExchange(mexId);
+ if (dao == null) {
+ // this should not happen....
+ String msg = "Engine requested non-existent message exchange: " + mexId;
+ __log.fatal(msg);
+ throw new BpelEngineException(msg);
+ }
+ return dao.getFault();
}
public QName getPartnerResponseType(String mexId) {
@@ -1177,24 +1063,20 @@
}
MessageDAO response;
- MessageExchange.Status status = MessageExchange.Status.valueOf(dao.getStatus());
- switch (status) {
- case FAULT:
- case RESPONSE:
- response = dao.getResponse();
- if (response == null) {
- // this also should not happen
- String msg = "Engine requested response for message exchange that did not have one: " + mexId;
- __log.fatal(msg);
- throw new BpelEngineException(msg);
- }
- break;
- default:
- // We should not be in any other state when requesting this.
- String msg = "Engine requested response while the message exchange " + mexId + " was in the state "
- + status;
+ MessageExchange.Status status = dao.getStatus();
+ if (status == Status.ACK) {
+ response = dao.getResponse();
+ if (response == null) {
+ // this also should not happen
+ String msg = "Engine requested response for message exchange that did not have one: " + mexId;
__log.fatal(msg);
throw new BpelEngineException(msg);
+ }
+ } else {
+ // We should not be in any other state when requesting this.
+ String msg = "Engine requested response while the message exchange " + mexId + " was in the state " + status;
+ __log.fatal(msg);
+ throw new BpelEngineException(msg);
}
return response;
}
@@ -1236,14 +1118,14 @@
return dao.getProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID);
}
- public void registerActivityForRecovery(ActivityRecoveryChannel channel, long activityId, String reason,
- Date dateTime, Element details, String[] actions, int retries) {
+ public void registerActivityForRecovery(ActivityRecoveryChannel channel, long activityId, String reason, Date dateTime,
+ Element details, String[] actions, int retries) {
if (reason == null)
reason = "Unspecified";
if (dateTime == null)
dateTime = new Date();
- __log.info("ActivityRecovery: Registering activity " + activityId + ", failure reason: " + reason +
- " on channel " + channel.export());
+ __log.info("ActivityRecovery: Registering activity " + activityId + ", failure reason: " + reason + " on channel "
+ + channel.export());
_dao.createActivityRecovery(channel.export(), (int) activityId, reason, dateTime, details, actions, retries);
}
@@ -1257,8 +1139,8 @@
public void run() {
ActivityRecoveryChannel recovery = importChannel(channel, ActivityRecoveryChannel.class);
- __log.info("ActivityRecovery: Recovering activity " + activityId + " with action " + action +
- " on channel " + recovery);
+ __log.info("ActivityRecovery: Recovering activity " + activityId + " with action " + action + " on channel "
+ + recovery);
if (recovery != null) {
if ("cancel".equals(action))
recovery.cancel();
@@ -1269,7 +1151,7 @@
}
}
});
- //_dao.deleteActivityRecovery(channel);
+ // _dao.deleteActivityRecovery(channel);
execute();
}
@@ -1291,48 +1173,5 @@
__log.debug("initializing partner " + pLink + " sessionId to " + session);
fetchPartnerLinkDAO(pLink).setPartnerSessionId(session);
- }
-
- /**
- * Attempt to match message exchanges on a correlator.
- *
- */
- public void matcherEvent(String correlatorId, CorrelationKey ckey) {
- if (BpelProcess.__log.isDebugEnabled()) {
- __log.debug("MatcherEvent handling: correlatorId=" + correlatorId + ", ckey=" + ckey);
- }
- CorrelatorDAO correlator = _dao.getProcess().getCorrelator(correlatorId);
-
- // Find the route first, this is a SELECT FOR UPDATE on the "selector" row,
- // So we want to acquire the lock before we do anthing else.
- MessageRouteDAO mroute = correlator.findRoute(ckey);
- if (mroute == null) {
- // Ok, this means that a message arrived before we did, so nothing to do.
- __log.debug("MatcherEvent handling: nothing to do, route no longer in DB");
- return;
- }
-
- // Now see if there is a message that matches this selector.
- MessageExchangeDAO mexdao = correlator.dequeueMessage(ckey);
- if (mexdao != null) {
- __log.debug("MatcherEvent handling: found matching message in DB (i.e. message arrived before <receive>)");
-
- // We have a match, so we can get rid of the routing entries.
- correlator.removeRoutes(mroute.getGroupId(),_dao);
-
- // Found message matching one of our selectors.
- if (BpelProcess.__log.isDebugEnabled()) {
- BpelProcess.__log.debug("SELECT: " + mroute.getGroupId() + ": matched to MESSAGE " + mexdao
- + " on CKEY " + ckey);
- }
-
- MyRoleMessageExchangeImpl mex = new MyRoleMessageExchangeImpl(_bpelProcess._engine, mexdao);
-
- inputMsgMatch(mroute.getGroupId(), mroute.getIndex(), mex);
- execute();
- } else {
- __log.debug("MatcherEvent handling: nothing to do, no matching message in DB");
-
- }
}
}
Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java Wed Sep 5 22:46:42 2007
@@ -19,33 +19,45 @@
package org.apache.ode.bpel.engine;
import java.util.ArrayList;
-import java.util.HashSet;
+import java.util.Date;
+import java.util.HashMap;
import java.util.List;
import java.util.Properties;
+import java.util.Random;
import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import javax.transaction.TransactionManager;
import javax.xml.namespace.QName;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ode.bpel.dao.BpelDAOConnection;
import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
+import org.apache.ode.bpel.dao.MessageExchangeDAO;
import org.apache.ode.bpel.dao.ProcessDAO;
import org.apache.ode.bpel.evt.BpelEvent;
import org.apache.ode.bpel.iapi.BindingContext;
-import org.apache.ode.bpel.iapi.BpelEngine;
import org.apache.ode.bpel.iapi.BpelEngineException;
import org.apache.ode.bpel.iapi.BpelEventListener;
import org.apache.ode.bpel.iapi.BpelServer;
+import org.apache.ode.bpel.iapi.ContextException;
+import org.apache.ode.bpel.iapi.Endpoint;
import org.apache.ode.bpel.iapi.EndpointReferenceContext;
+import org.apache.ode.bpel.iapi.InvocationStyle;
+import org.apache.ode.bpel.iapi.Message;
+import org.apache.ode.bpel.iapi.MessageExchange;
import org.apache.ode.bpel.iapi.MessageExchangeContext;
+import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
import org.apache.ode.bpel.iapi.ProcessConf;
import org.apache.ode.bpel.iapi.Scheduler;
import org.apache.ode.bpel.iapi.Scheduler.JobInfo;
import org.apache.ode.bpel.iapi.Scheduler.JobProcessorException;
-import org.apache.ode.bpel.iapi.Scheduler.Synchronizer;
import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
import org.apache.ode.bpel.o.OProcess;
import org.apache.ode.utils.msg.MessageBundle;
@@ -56,45 +68,55 @@
* <p>
* The BPEL server implementation.
* </p>
- *
+ *
* <p>
- * This implementation is intended to be thread safe. The key concurrency
- * mechanism is a "management" read/write lock that synchronizes all management
- * operations (they require "write" access) and prevents concurrent management
- * operations and processing (processing requires "read" access). Write access
- * to the lock is scoped to the method, while read access is scoped to a
+ * This implementation is intended to be thread safe. The key concurrency mechanism is a "management" read/write lock that
+ * synchronizes all management operations (they require "write" access) and prevents concurrent management operations and processing
+ * (processing requires "read" access). Write access to the lock is scoped to the method, while read access is scoped to a
* transaction.
* </p>
- *
+ *
* @author Maciej Szefler <mszefler at gmail dot com>
* @author Matthieu Riou <mriou at apache dot org>
*/
public class BpelServerImpl implements BpelServer, Scheduler.JobProcessor {
private static final Log __log = LogFactory.getLog(BpelServerImpl.class);
+
private static final Messages __msgs = MessageBundle.getMessages(Messages.class);
/** Maximum age of a process before it is quiesced */
private static Long __processMaxAge;
- /**
- * Set of processes that are registered with the server. Includes hydrated and dehydrated processes.
- * Guarded by _mngmtLock.writeLock().
+ /** RNG, for delays */
+ private Random _random = new Random(System.currentTimeMillis());
+
+ private static double _delayMean = 0;
+
+ /**
+ * Set of processes that are registered with the server. Includes hydrated and dehydrated processes. Guarded by
+ * _mngmtLock.writeLock().
*/
- private final Set<BpelProcess> _registeredProcesses = new HashSet<BpelProcess>();
+ private final HashMap<QName, BpelProcess> _registeredProcesses = new HashMap<QName, BpelProcess>();
+
+ /** Mapping from myrole service name to active process. */
+ private final HashMap<QName, BpelProcess> _serviceMap = new HashMap<QName, BpelProcess>();
private State _state = State.SHUTDOWN;
- private Contexts _contexts = new Contexts();
+
+ Contexts _contexts = new Contexts();
+
private DehydrationPolicy _dehydrationPolicy;
+
private Properties _configProperties;
-
- BpelEngineImpl _engine;
+
+ private ExecutorService _exec;
+
BpelDatabase _db;
/**
- * Management lock for synchronizing management operations and preventing
- * processing (transactions) from occuring while management operations are
- * in progress.
+ * Management lock for synchronizing management operations and preventing processing (transactions) from occuring while
+ * management operations are in progress.
*/
private ReadWriteLock _mngmtLock = new ReentrantReadWriteLock();
@@ -121,7 +143,7 @@
public BpelServerImpl() {
}
-
+
public void start() {
_mngmtLock.writeLock().lock();
try {
@@ -132,46 +154,65 @@
__log.debug("BPEL SERVER starting.");
+
+ if (_exec == null)
+ _exec = Executors.newCachedThreadPool();
+
+ if (_contexts.txManager == null) {
+ String errmsg = "Transaction manager not specified; call setTransactionManager(...)!";
+ __log.fatal(errmsg);
+ throw new IllegalStateException(errmsg);
+ }
+
+ if (_contexts.scheduler == null) {
+ String errmsg = "Scheduler not specified; call setScheduler(...)!";
+ __log.fatal(errmsg);
+ throw new IllegalStateException(errmsg);
+ }
+
_contexts.scheduler.start();
_state = State.RUNNING;
__log.info(__msgs.msgServerStarted());
- if (_dehydrationPolicy != null) new Thread(new ProcessDefReaper()).start();
+ if (_dehydrationPolicy != null)
+ new Thread(new ProcessDefReaper()).start();
} finally {
_mngmtLock.writeLock().unlock();
}
}
/**
- * Register a global listener to receive {@link BpelEvent}s froom all
- * processes.
+ * Register a global listener to receive {@link BpelEvent}s froom all processes.
+ *
* @param listener
*/
public void registerBpelEventListener(BpelEventListener listener) {
+ listener.startup(_configProperties);
+
// Do not synchronize, eventListeners is copy-on-write array.
- listener.startup(_configProperties);
- _contexts.eventListeners.add(listener);
+ _contexts.eventListeners.add(listener);
}
/**
- * Unregister a global listener from receive {@link BpelEvent}s from all
- * processes.
+ * Unregister a global listener from receive {@link BpelEvent}s from all processes.
+ *
* @param listener
*/
public void unregisterBpelEventListener(BpelEventListener listener) {
// Do not synchronize, eventListeners is copy-on-write array.
- try {
- listener.shutdown();
- } catch (Exception e) {
- __log.warn("Stopping BPEL event listener " + listener.getClass().getName() + " failed, nevertheless it has been unregistered.", e);
- } finally {
- _contexts.eventListeners.remove(listener);
- }
+ if (_contexts.eventListeners.remove(listener)) {
+ try {
+ listener.shutdown();
+ } catch (Exception e) {
+ __log.warn("Stopping BPEL event listener " + listener.getClass().getName()
+ + " failed, nevertheless it has been unregistered.", e);
+ }
+ }
}
-
+
private void unregisterBpelEventListeners() {
- for (BpelEventListener l : _contexts.eventListeners) {
- unregisterBpelEventListener(l);
- }
+ for (BpelEventListener l : _contexts.eventListeners) {
+ unregisterBpelEventListener(l);
+ }
}
public void stop() {
@@ -185,7 +226,6 @@
__log.debug("BPEL SERVER STOPPING");
_contexts.scheduler.stop();
- _engine = null;
_state = State.INIT;
__log.info(__msgs.msgServerStopped());
} finally {
@@ -201,10 +241,8 @@
__log.debug("BPEL SERVER initializing ");
- _db = new BpelDatabase(_contexts.dao, _contexts.scheduler);
+ _db = new BpelDatabase(_contexts);
_state = State.INIT;
-
- _engine = new BpelEngineImpl(_contexts);
} finally {
_mngmtLock.writeLock().unlock();
@@ -218,7 +256,6 @@
unregisterBpelEventListeners();
_db = null;
- _engine = null;
_state = State.SHUTDOWN;
} finally {
_mngmtLock.writeLock().unlock();
@@ -226,26 +263,6 @@
}
- public BpelEngine getEngine() {
- boolean registered = false;
- _mngmtLock.readLock().lock();
- try {
- _contexts.scheduler.registerSynchronizer(new Synchronizer() {
- public void afterCompletion(boolean success) {
- _mngmtLock.readLock().unlock();
- }
- public void beforeCompletion() {
- }
- });
- registered = true;
- } finally {
- // If we failed to register the synchro,then there was an ex/throwable; we need to unlock now.
- if (!registered)
- _mngmtLock.readLock().unlock();
- }
- return _engine;
- }
-
public void register(ProcessConf conf) {
if (conf == null)
throw new NullPointerException("must specify non-null process configuration.");
@@ -263,17 +280,23 @@
try {
// If the process is already active, do nothing.
- if (_engine.isProcessRegistered(conf.getProcessId())) {
+ if (_registeredProcesses.containsKey(conf.getProcessId())) {
__log.debug("skipping doRegister" + conf.getProcessId() + ") -- process is already registered");
return;
}
__log.debug("Registering process " + conf.getProcessId() + " with server.");
- BpelProcess process = new BpelProcess(conf);
+ BpelProcess process = new BpelProcess(this, conf, null);
+
+ for (Endpoint e : process.getServiceNames()) {
+ __log.debug("Register process: serviceId=" + e + ", process=" + process);
+ _serviceMap.put(e.serviceName, process);
+ }
- _engine.registerProcess(process);
- _registeredProcesses.add(process);
+ process.activate(_contexts);
+
+ _registeredProcesses.put(process.getPID(), process);
process.hydrate();
__log.info(__msgs.msgProcessRegistered(conf.getProcessId()));
@@ -294,11 +317,13 @@
}
try {
- BpelProcess p = null;
- if (_engine != null) {
- _engine.unregisterProcess(pid);
- _registeredProcesses.remove(p);
- }
+ BpelProcess p = _registeredProcesses.remove(pid);
+ if (p == null)
+ return;
+
+ p.deactivate();
+ while (_serviceMap.values().remove(p))
+ ;
__log.info(__msgs.msgProcessUnregistered(pid));
@@ -312,7 +337,9 @@
/**
* Register a global message exchange interceptor.
- * @param interceptor message-exchange interceptor
+ *
+ * @param interceptor
+ * message-exchange interceptor
*/
public void registerMessageExchangeInterceptor(MessageExchangeInterceptor interceptor) {
// NOTE: do not synchronize, globalInterceptors is copy-on-write.
@@ -321,7 +348,9 @@
/**
* Unregister a global message exchange interceptor.
- * @param interceptor message-exchange interceptor
+ *
+ * @param interceptor
+ * message-exchange interceptor
*/
public void unregisterMessageExchangeInterceptor(MessageExchangeInterceptor interceptor) {
// NOTE: do not synchronize, globalInterceptors is copy-on-write.
@@ -329,6 +358,28 @@
}
/**
+ * Route to a process using the service id. Note, that we do not need the endpoint name here, we are assuming that two processes
+ * would not be registered under the same service qname but different endpoint.
+ *
+ * @param service
+ * target service id
+ * @param request
+ * request message
+ * @return process corresponding to the targetted service, or <code>null</code> if service identifier is not recognized.
+ */
+ BpelProcess route(QName service, Message request) {
+ // TODO: use the message to route to the correct service if more than
+ // one service is listening on the same endpoint.
+
+ _mngmtLock.readLock().lock();
+ try {
+ return _serviceMap.get(service);
+ } finally {
+ _mngmtLock.readLock().unlock();
+ }
+ }
+
+ /**
* Check a state transition from state "i" to state "j".
*/
private final boolean checkState(State i, State j) {
@@ -360,10 +411,273 @@
}
}
- public void onScheduledJob(JobInfo jobInfo) throws JobProcessorException {
- getEngine().onScheduledJob(jobInfo);
+ public void onScheduledJob(final JobInfo jobInfo) throws JobProcessorException {
+ _mngmtLock.readLock().lock();
+ try {
+ final WorkEvent we = new WorkEvent(jobInfo.jobDetail);
+ BpelProcess process = _registeredProcesses.get(we.getProcessId());
+ if (process == null) {
+ // If the process is not active, it means that we should not be
+ // doing any work on its behalf, therefore we will reschedule the
+ // events for some time in the future (1 minute).
+ _contexts.execTransaction(new Callable<Void>() {
+
+ public Void call() throws Exception {
+ _contexts.scheduler.jobCompleted(jobInfo.jobName);
+ Date future = new Date(System.currentTimeMillis() + (60 * 1000));
+ __log.info(__msgs.msgReschedulingJobForInactiveProcess(we.getProcessId(), jobInfo.jobName, future));
+ _contexts.scheduler.schedulePersistedJob(we.getDetail(), future);
+ return null;
+ }
+
+ });
+ return;
+ }
+
+ process.handleWorkEvent(jobInfo);
+ } catch (Exception ex) {
+ throw new JobProcessorException(ex, true);
+ } finally {
+ _mngmtLock.readLock().unlock();
+ }
+ }
+
+ public void setTransactionManager(TransactionManager txm) {
+ _contexts.txManager = txm;
+ }
+
+ public void setDehydrationPolicy(DehydrationPolicy dehydrationPolicy) {
+ _dehydrationPolicy = dehydrationPolicy;
+ }
+
+ public void setConfigProperties(Properties configProperties) {
+ _configProperties = configProperties;
+ }
+
+ public void setMessageExchangeContext(MessageExchangeContext mexContext) throws BpelEngineException {
+ _contexts.mexContext = mexContext;
+ }
+
+ public void setScheduler(Scheduler scheduler) throws BpelEngineException {
+ _contexts.scheduler = scheduler;
+ }
+
+ public void setEndpointReferenceContext(EndpointReferenceContext eprContext) throws BpelEngineException {
+ _contexts.eprContext = eprContext;
+ }
+
+ /**
+ * Set the DAO connection factory. The DAO is used by the BPEL engine to persist information about active processes.
+ *
+ * @param daoCF
+ * {@link BpelDAOConnectionFactory} implementation.
+ */
+ public void setDaoConnectionFactory(BpelDAOConnectionFactory daoCF) throws BpelEngineException {
+ _contexts.dao = daoCF;
+ }
+
+ public void setBindingContext(BindingContext bc) {
+ _contexts.bindingContext = bc;
+ }
+
+ public MyRoleMessageExchange createMessageExchange(final InvocationStyle istyle, final QName targetService,
+ final String operation, final String clientKey) throws BpelEngineException {
+
+ _mngmtLock.readLock().lock();
+ try {
+ final BpelProcess target = route(targetService, null);
+
+ if (target == null)
+ throw new BpelEngineException("NoSuchService: " + targetService);
+
+ if (istyle == InvocationStyle.RELIABLE || istyle == InvocationStyle.TRANSACTED)
+ assertTransaction();
+ else
+ assertNoTransaction();
+
+
+ return target.createNewMyRoleMex(istyle, targetService, operation, clientKey);
+ } finally {
+ _mngmtLock.readLock().unlock();
+ }
+ }
+
+ public MessageExchange getMessageExchange(final String mexId) throws BpelEngineException {
+
+ _mngmtLock.readLock().lock();
+ try {
+ final MessageExchangeDAO inmemdao = getInMemMexDAO(mexId);
+
+ Callable<MessageExchange> loadMex = new Callable<MessageExchange>() {
+
+ public MessageExchange call() {
+ MessageExchangeDAO mexdao = (inmemdao == null) ? mexdao = _contexts.dao.getConnection().getMessageExchange(
+ mexId) : inmemdao;
+ if (mexdao == null)
+ return null;
+
+ ProcessDAO pdao = mexdao.getProcess();
+ BpelProcess process = pdao == null ? null : _registeredProcesses.get(pdao.getProcessId());
+
+ if (process == null) {
+ String errmsg = __msgs.msgProcessNotActive(pdao.getProcessId());
+ __log.error(errmsg);
+ // TODO: Perhaps we should define a checked exception for this
+ // condition.
+ throw new BpelEngineException(errmsg);
+ }
+
+ InvocationStyle istyle = mexdao.getInvocationStyle();
+ if (istyle == InvocationStyle.RELIABLE || istyle == InvocationStyle.TRANSACTED)
+ assertTransaction();
+
+ switch (mexdao.getDirection()) {
+ case MessageExchangeDAO.DIR_BPEL_INVOKES_PARTNERROLE:
+ return process.createPartnerRoleMex(mexdao);
+ case MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE:
+ return process.lookupMyRoleMex(mexdao);
+ default:
+ String errmsg = "BpelEngineImpl: internal error, invalid MexDAO direction: " + mexId;
+ __log.fatal(errmsg);
+ throw new BpelEngineException(errmsg);
+ }
+ }
+ };
+
+ try {
+ if (inmemdao != null || _contexts.isTransacted())
+ return loadMex.call();
+ else
+ return enqueueTransaction(loadMex).get();
+ } catch (ContextException e) {
+ throw new BpelEngineException(e);
+ } catch (Exception e) {
+ throw new BpelEngineException(e);
+ }
+
+ } finally {
+ _mngmtLock.readLock().unlock();
+ }
+
+ }
+
+ public MessageExchange getMessageExchangeByForeignKey(String foreignKey) throws BpelEngineException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public Set<InvocationStyle> getSupportedInvocationStyle(QName serviceId) {
+
+ _mngmtLock.readLock().lock();
+ try {
+ BpelProcess process = _serviceMap.get(serviceId);
+ if (process == null)
+ throw new BpelEngineException("No such service: " + serviceId);
+ return process.getSupportedInvocationStyle(serviceId);
+ } finally {
+ _mngmtLock.readLock().unlock();
+ }
+ }
+
+ MessageExchangeDAO getInMemMexDAO(String mexId) {
+ _mngmtLock.readLock().lock();
+ try {
+ for (BpelProcess p : _registeredProcesses.values()) {
+ MessageExchangeDAO mexDao = p.getInMemMexDAO(mexId);
+ if (mexDao != null)
+ return mexDao;
+ }
+ } finally {
+ _mngmtLock.readLock().unlock();
+ }
+
+ return null;
+ }
+
+ OProcess getOProcess(QName processId) {
+ _mngmtLock.readLock().lock();
+ try {
+ BpelProcess process = _registeredProcesses.get(processId);
+
+ if (process == null)
+ return null;
+
+ return process.getOProcess();
+
+ } finally {
+ _mngmtLock.readLock().unlock();
+ }
+ }
+
+
+ <T> Future<T> enqueueTransaction(final Callable<T> transaction) throws ContextException {
+ return _exec.submit(new ServerCallable<T>(new TransactedCallable<T>(transaction)));
+ }
+
+ void enqueueRunnable(final Runnable runnable) {
+ _exec.submit(new ServerRunnable(runnable));
+ }
+
+ /**
+ * Schedule a {@link Runnable} object for execution after the completion of the current transaction.
+ * @param runnable
+ */
+ void scheduleRunnable(final Runnable runnable) {
+ assertTransaction();
+ _contexts.registerCommitSynchronizer(new Runnable() {
+
+ public void run() {
+ _exec.submit(new ServerRunnable(runnable));
+ }
+
+ });
+
}
+
+ protected void assertTransaction() {
+ if (!_contexts.isTransacted())
+ throw new BpelEngineException("Operation must be performed in a transaction!");
+ }
+
+ protected void assertNoTransaction() {
+ if (_contexts.isTransacted())
+ throw new BpelEngineException("Operation must be performed outside of a transaction!");
+ }
+
+ void fireEvent(BpelEvent event) {
+ // Note that the eventListeners list is a copy-on-write array, so need
+ // to mess with synchronization.
+ for (org.apache.ode.bpel.iapi.BpelEventListener l : _contexts.eventListeners) {
+ l.onEvent(event);
+ }
+ }
+
+ /**
+ * Block the thread for random amount of time. Used for testing for races and the like. The delay generated is exponentially
+ * distributed with the mean obtained from the <code>ODE_DEBUG_TX_DELAY</code> environment variable.
+ */
+ private void debuggingDelay() {
+ // Do a delay for debugging purposes.
+ if (_delayMean != 0)
+ try {
+ long delay = randomExp(_delayMean);
+ // distribution
+ // with mean
+ // _delayMean
+ __log.warn("Debugging delay has been activated; delaying transaction for " + delay + "ms.");
+ Thread.sleep(delay);
+ } catch (InterruptedException e) {
+ ; // ignore
+ }
+ }
+
+ private long randomExp(double mean) {
+ double u = _random.nextDouble(); // Uniform
+ long delay = (long) (-Math.log(u) * mean); // Exponential
+ return delay;
+ }
+
private class ProcessDefReaper implements Runnable {
public void run() {
__log.debug("Starting process definition reaper thread.");
@@ -372,16 +686,16 @@
while (true) {
Thread.sleep(pollingTime);
_mngmtLock.writeLock().lockInterruptibly();
- try {
+ try {
__log.debug("Kicking reaper, OProcess instances: " + OProcess.instanceCount);
- // Copying the runnning process list to avoid synchronization
+ // Copying the runnning process list to avoid synchronizatMessageExchangeInterion
// problems and a potential mess if a policy modifies the list
- List<BpelProcess> candidates = new ArrayList<BpelProcess>(_registeredProcesses);
+ List<BpelProcess> candidates = new ArrayList<BpelProcess>(_registeredProcesses.values());
CollectionsX.remove_if(candidates, new MemberOfFunction<BpelProcess>() {
public boolean isMember(BpelProcess o) {
return !o.hintIsHydrated();
}
-
+
});
// And the happy winners are...
@@ -401,43 +715,81 @@
}
}
- public void setDehydrationPolicy(DehydrationPolicy dehydrationPolicy) {
- _dehydrationPolicy = dehydrationPolicy;
+ public BpelProcess getBpelProcess(QName processId) {
+ _mngmtLock.readLock().lock();
+ try {
+ return _registeredProcesses.get(processId);
+ } finally {
+ _mngmtLock.readLock().unlock();
+ }
}
- public void setConfigProperties(Properties configProperties) {
- _configProperties = configProperties;
+
+
+
+ class ServerRunnable implements Runnable {
+ final Runnable _work;
+ ServerRunnable(Runnable work) {
+ _work = work;
+ }
+
+ public void run() {
+ _mngmtLock.readLock().lock();
+ try {
+ _work.run();
+ } catch (Throwable ex) {
+ __log.fatal("Internal Error", ex);
+ } finally {
+ _mngmtLock.readLock().unlock();
+ }
+ }
+
}
- public void setMessageExchangeContext(MessageExchangeContext mexContext) throws BpelEngineException {
- _contexts.mexContext = mexContext;
+
+
+ class ServerCallable<T> implements Callable<T>{
+ final Callable<T> _work;
+ ServerCallable(Callable<T> work) {
+ _work = work;
+ }
+
+ public T call () throws Exception {
+ _mngmtLock.readLock().lock();
+ try {
+ return _work.call();
+ } catch (Exception ex) {
+ __log.fatal("Internal Error", ex);
+ throw ex;
+ } finally {
+ _mngmtLock.readLock().unlock();
+ }
+ }
+
}
- public void setScheduler(Scheduler scheduler) throws BpelEngineException {
- _contexts.scheduler = scheduler;
- }
+ class TransactedCallable<T> implements Callable<T> {
+ Callable<T> _work;
- public void setEndpointReferenceContext(EndpointReferenceContext eprContext) throws BpelEngineException {
- _contexts.eprContext = eprContext;
- }
+ TransactedCallable(Callable<T> work) {
+ _work = work;
+ }
- /**
- * Set the DAO connection factory. The DAO is used by the BPEL engine to
- * persist information about active processes.
- *
- * @param daoCF
- * {@link BpelDAOConnectionFactory} implementation.
- */
- public void setDaoConnectionFactory(BpelDAOConnectionFactory daoCF) throws BpelEngineException {
- _contexts.dao = daoCF;
+ public T call() throws Exception {
+ return _contexts.execTransaction(_work);
+ }
}
- public void setInMemDaoConnectionFactory(BpelDAOConnectionFactory daoCF) {
- _contexts.inMemDao = daoCF;
- }
- public void setBindingContext(BindingContext bc) {
- _contexts.bindingContext = bc;
- }
+ class TransactedRunnable implements Runnable {
+ Runnable _work;
+
+ TransactedRunnable(Runnable work) {
+ _work = work;
+ }
+ public void run() {
+ _contexts.execTransaction(_work);
+ }
+ }
}
Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java Wed Sep 5 22:46:42 2007
@@ -19,22 +19,34 @@
package org.apache.ode.bpel.engine;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
import org.apache.ode.bpel.iapi.BindingContext;
+import org.apache.ode.bpel.iapi.BpelEngineException;
import org.apache.ode.bpel.iapi.BpelEventListener;
+import org.apache.ode.bpel.iapi.ContextException;
import org.apache.ode.bpel.iapi.EndpointReferenceContext;
import org.apache.ode.bpel.iapi.MessageExchangeContext;
import org.apache.ode.bpel.iapi.Scheduler;
import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
import java.util.List;
+import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
+import javax.transaction.Status;
+import javax.transaction.Synchronization;
+import javax.transaction.SystemException;
+import javax.transaction.TransactionManager;
+
/**
- * Aggregation of all the contexts provided to the BPEL engine by the
- * integration layer.
+ * Aggregation of all the contexts provided to the BPEL engine by the integration layer.
*/
class Contexts {
+ private static final Log __log = LogFactory.getLog(Contexts.class);
+
+ TransactionManager txManager;
MessageExchangeContext mexContext;
@@ -45,13 +57,85 @@
BindingContext bindingContext;
BpelDAOConnectionFactory dao;
- BpelDAOConnectionFactory inMemDao;
- /** Global Message-Exchange interceptors. Must be copy-on-write!!! */
- final List<MessageExchangeInterceptor >globalIntereceptors = new CopyOnWriteArrayList<MessageExchangeInterceptor>();
+ /** Global Message-Exchange interceptors. Must be copy-on-write!!! */
+ final List<MessageExchangeInterceptor> globalIntereceptors = new CopyOnWriteArrayList<MessageExchangeInterceptor>();
/** Global event listeners. Must be copy-on-write!!! */
final List<BpelEventListener> eventListeners = new CopyOnWriteArrayList<BpelEventListener>();
+ public boolean isTransacted() {
+ try {
+ return txManager.getStatus() == Status.STATUS_ACTIVE;
+ } catch (SystemException e) {
+ throw new BpelEngineException(e);
+ }
+ }
+
+ public void execTransaction(final Runnable transaction) {
+ try {
+ execTransaction(new Callable<Void>() {
+
+ public Void call() throws Exception {
+ transaction.run();
+ return null;
+ }
+
+ });
+ } catch (Exception e) {
+ throw new BpelEngineException(e);
+ }
+
+ }
+
+ public <T> T execTransaction(Callable<T> transaction) throws Exception{
+ try {
+ txManager.begin();
+ } catch (Exception ex) {
+ String errmsg = "Internal Error, could not begin transaction.";
+ throw new BpelEngineException(errmsg, ex);
+ }
+ boolean success = false;
+ try {
+ T retval = transaction.call();
+ success = true;
+ return retval;
+ } catch (Exception ex) {
+ throw ex;
+ } finally {
+ if (success)
+ try {
+ txManager.commit();
+ } catch (Exception ex) {
+ __log.error("Commit failed.", ex);
+ throw new BpelEngineException("Commit failed.", ex);
+ }
+ else
+ try {
+ txManager.rollback();
+ } catch (Exception ex) {
+ __log.error("Transaction rollback failed.", ex);
+ }
+ }
+ }
+
+ public void registerCommitSynchronizer(final Runnable runnable) {
+ try {
+ txManager.getTransaction().registerSynchronization(new Synchronization() {
+
+ public void afterCompletion(int status) {
+ if (status == Status.STATUS_COMMITTED)
+ runnable.run();
+ }
+
+ public void beforeCompletion() {
+
+ }
+
+ });
+ } catch (Exception ex) {
+ throw new BpelEngineException("Error registering synchronizer." ,ex);
+ }
+ }
}