You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by ms...@apache.org on 2007/01/10 18:28:23 UTC
svn commit: r494901 -
/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
Author: mszefler
Date: Wed Jan 10 09:28:20 2007
New Revision: 494901
URL: http://svn.apache.org/viewvc?view=rev&rev=494901
Log:
Messages where the correlation keys were not found in the correct place were causing rollbacks. Fixed to cause message exchange failure.
Modified:
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?view=diff&rev=494901&r1=494900&r2=494901
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java Wed Jan 10 09:28:20 2007
@@ -98,7 +98,8 @@
/** Last time the process was used. */
volatile long _lastUsed;
- public BpelProcess(ProcessConf conf, OProcess oprocess, BpelEventListener debugger, ExpressionLanguageRuntimeRegistry expLangRuntimeRegistry) {
+ public BpelProcess(ProcessConf conf, OProcess oprocess, BpelEventListener debugger,
+ ExpressionLanguageRuntimeRegistry expLangRuntimeRegistry) {
_pid = conf.getProcessId();
_pconf = conf;
@@ -111,8 +112,8 @@
for (Map.Entry<String, Endpoint> provide : conf.getProvideEndpoints().entrySet()) {
OPartnerLink plink = oprocess.getPartnerLink(provide.getKey());
if (plink == null) {
- String errmsg = "Error in deployment descriptor for process " + _pid + "; reference to unknown partner link "
- + provide.getKey();
+ String errmsg = "Error in deployment descriptor for process " + _pid
+ + "; reference to unknown partner link " + provide.getKey();
__log.error(errmsg);
throw new BpelEngineException(errmsg);
}
@@ -124,13 +125,13 @@
for (Map.Entry<String, Endpoint> invoke : conf.getInvokeEndpoints().entrySet()) {
OPartnerLink plink = oprocess.getPartnerLink(invoke.getKey());
if (plink == null) {
- String errmsg = "Error in deployment descriptor for process " + _pid + "; reference to unknown partner link "
- + invoke.getKey();
+ String errmsg = "Error in deployment descriptor for process " + _pid
+ + "; reference to unknown partner link " + invoke.getKey();
__log.error(errmsg);
throw new BpelEngineException(errmsg);
}
- __log.debug("Processing <invoke> element for process " + _pid + ": partnerlink " + invoke.getKey() + " --> "
- + invoke.getValue());
+ __log.debug("Processing <invoke> element for process " + _pid + ": partnerlink " + invoke.getKey()
+ + " --> " + invoke.getValue());
partnerRoleIntialValues.put(plink, invoke.getValue());
}
@@ -146,7 +147,8 @@
}
if (pl.hasPartnerRole()) {
- PartnerLinkPartnerRoleImpl partnerRole = new PartnerLinkPartnerRoleImpl(pl, conf.getInvokeEndpoints().get(pl.getName()));
+ PartnerLinkPartnerRoleImpl partnerRole = new PartnerLinkPartnerRoleImpl(pl, conf.getInvokeEndpoints()
+ .get(pl.getName()));
_partnerRoles.put(pl, partnerRole);
}
}
@@ -156,7 +158,8 @@
return "BpelProcess[" + _pid + "]";
}
- public void recoverActivity(ProcessInstanceDAO instanceDAO, String channel, long activityId, String action, FaultData fault) {
+ public void recoverActivity(ProcessInstanceDAO instanceDAO, String channel, long activityId, String action,
+ FaultData fault) {
if (__log.isDebugEnabled())
__log.debug("Recovering activity in process " + instanceDAO.getInstanceId() + " with action " + action);
@@ -216,7 +219,8 @@
if (target != null) {
mex.setPortOp(target._plinkDef.myRolePortType, target._plinkDef.getMyRoleOperation(mex.getOperationName()));
} else {
- __log.warn("Couldn't find endpoint from service " + mex.getServiceName() + " when initializing a myRole mex.");
+ __log.warn("Couldn't find endpoint from service " + mex.getServiceName()
+ + " when initializing a myRole mex.");
}
}
@@ -271,15 +275,18 @@
}
/**
- * Get the element name for a given WSDL part. If the part is an <em>element</em> part, the name of that element is returned.
- * If the part is an XML schema typed part, then the name of the part is returned in the null namespace.
+ * Get the element name for a given WSDL part. If the part is an
+ * <em>element</em> part, the name of that element is returned. If the
+ * part is an XML schema typed part, then the name of the part is returned
+ * in the null namespace.
*
* @param part
* WSDL {@link javax.wsdl.Part}
* @return name of element containing said part
*/
static QName getElementNameForPart(OMessageVarType.Part part) {
- return (part.type instanceof OElementVarType) ? ((OElementVarType) part.type).elementType : new QName(null, part.name);
+ return (part.type instanceof OElementVarType) ? ((OElementVarType) part.type).elementType : new QName(null,
+ part.name);
}
/** Create a version-appropriate runtime context. */
@@ -293,10 +300,12 @@
*
* @param mex
* message exchange
- * @return <code>true</code> if execution should continue, <code>false</code> otherwise
+ * @return <code>true</code> if execution should continue,
+ * <code>false</code> otherwise
*/
private boolean processInterceptors(MyRoleMessageExchangeImpl mex, InterceptorInvoker invoker) {
- InterceptorContextImpl ictx = new InterceptorContextImpl(_engine._contexts.dao.getConnection(), getProcessDAO(),_pconf);
+ InterceptorContextImpl ictx = new InterceptorContextImpl(_engine._contexts.dao.getConnection(),
+ getProcessDAO(), _pconf);
for (MessageExchangeInterceptor i : _mexInterceptors)
if (!mex.processInterceptor(i, mex, ictx, invoker))
@@ -310,7 +319,8 @@
}
/**
- * Replacement object for serializtation of the {@link OBase} (compiled BPEL) objects in the JACOB VPU.
+ * Replacement object for serializtation of the {@link OBase} (compiled
+ * BPEL) objects in the JACOB VPU.
*/
public static final class OBaseReplacementImpl implements Externalizable {
private static final long serialVersionUID = 1L;
@@ -348,8 +358,8 @@
}
/**
- * Get the initial value of this role's EPR. This value is obtained from the integration layer when the process is enabled
- * on the server.
+ * Get the initial value of this role's EPR. This value is obtained from
+ * the integration layer when the process is enabled on the server.
*
* @return initial epr
*/
@@ -389,7 +399,8 @@
*/
public void invokeMyRole(MyRoleMessageExchangeImpl mex) {
if (__log.isTraceEnabled()) {
- __log.trace(ObjectPrinter.stringifyMethodEnter(this + ":inputMsgRcvd", new Object[] { "messageExchange", mex }));
+ __log.trace(ObjectPrinter.stringifyMethodEnter(this + ":inputMsgRcvd", new Object[] {
+ "messageExchange", mex }));
}
Operation operation = getMyRoleOperation(mex.getOperationName());
@@ -398,7 +409,7 @@
mex.setFailure(FailureType.UNKNOWN_OPERATION, mex.getOperationName(), null);
return;
}
-
+
mex.getDAO().setPartnerLinkModelId(_plinkDef.getId());
mex.setPortOp(_plinkDef.myRolePortType, operation);
mex.setPattern(operation.getOutput() == null ? MessageExchangePattern.REQUEST_ONLY
@@ -421,20 +432,26 @@
MessageRouteDAO messageRoute = null;
// We need to compute the correlation keys (based on the operation
- // we can
- // infer which correlation keys to compute - this is merely a set
+ // we can infer which correlation keys to compute - this is merely a set
// consisting of each correlationKey used in each correlation sets
- // that is
- // ever
- // referenced in an <receive>/<onMessage> on this
+ // that is ever referenced in an <receive>/<onMessage> on this
// partnerlink/operation.
- keys = computeCorrelationKeys(mex);
+ try {
+ keys = computeCorrelationKeys(mex);
+ } catch (InvalidMessageException ime) {
+ // We'd like to do a graceful exit here, no sense in rolling back due to a
+ // a message format problem.
+ __log.debug("Unable to evaluate correlation keys, invalid message format. ",ime);
+ mex.setFailure(FailureType.FORMAT_ERROR, ime.getMessage(), null);
+ return;
+ }
String mySessionId = mex.getProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID);
String partnerSessionId = mex.getProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID);
if (__log.isDebugEnabled()) {
- __log.debug("INPUTMSG: " + correlatorId + ": MSG RCVD keys=" + ArrayUtils.makeCollection(HashSet.class, keys)
- + " mySessionId=" + mySessionId + " partnerSessionId=" + partnerSessionId);
+ __log.debug("INPUTMSG: " + correlatorId + ": MSG RCVD keys="
+ + ArrayUtils.makeCollection(HashSet.class, keys) + " mySessionId=" + mySessionId
+ + " partnerSessionId=" + partnerSessionId);
}
CorrelationKey matchedKey = null;
@@ -476,9 +493,8 @@
BpelRuntimeContextImpl instance = createRuntimeContext(newInstance, new PROCESS(_oprocess), mex);
// send process instance event
- NewProcessInstanceEvent evt = new NewProcessInstanceEvent(
- new QName(_oprocess.targetNamespace, _oprocess.getName()), getProcessDAO().getProcessId(), newInstance
- .getInstanceId());
+ NewProcessInstanceEvent evt = new NewProcessInstanceEvent(new QName(_oprocess.targetNamespace,
+ _oprocess.getName()), getProcessDAO().getProcessId(), newInstance.getInstanceId());
evt.setPortType(mex.getPortType().getQName());
evt.setOperation(operation.getName());
evt.setMexId(mex.getMessageExchangeId());
@@ -494,10 +510,6 @@
+ messageRoute.getTargetInstance().getInstanceId());
}
- // Attempt to acquire an instance-level lock.
- // _lockManager.lock(messageRoute.getTargetInstance().getInstanceId(),
- // 60, TimeUnit.SECONDS);
-
ProcessInstanceDAO instanceDao = messageRoute.getTargetInstance();
// Reload process instance for DAO.
@@ -505,14 +517,12 @@
instance.inputMsgMatch(messageRoute.getGroupId(), messageRoute.getIndex(), mex);
// Kill the route so some new message does not get routed to
- // same
- // process
- // instance.
+ // same process instance.
correlator.removeRoutes(messageRoute.getGroupId(), instanceDao);
// send process instance event
- CorrelationMatchEvent evt = new CorrelationMatchEvent(new QName(_oprocess.targetNamespace, _oprocess.getName()),
- getProcessDAO().getProcessId(), instanceDao.getInstanceId(), matchedKey);
+ CorrelationMatchEvent evt = new CorrelationMatchEvent(new QName(_oprocess.targetNamespace, _oprocess
+ .getName()), getProcessDAO().getProcessId(), instanceDao.getInstanceId(), matchedKey);
evt.setPortType(mex.getPortType().getQName());
evt.setOperation(operation.getName());
evt.setMexId(mex.getMessageExchangeId());
@@ -537,8 +547,8 @@
} else {
// send event
- CorrelationNoMatchEvent evt = new CorrelationNoMatchEvent(mex.getPortType().getQName(), mex.getOperation()
- .getName(), mex.getMessageExchangeId(), keys);
+ CorrelationNoMatchEvent evt = new CorrelationNoMatchEvent(mex.getPortType().getQName(), mex
+ .getOperation().getName(), mex.getMessageExchangeId(), keys);
evt.setProcessId(getProcessDAO().getProcessId());
evt.setProcessName(new QName(_oprocess.targetNamespace, _oprocess.getName()));
@@ -552,8 +562,10 @@
}
}
- // Now we have to update our message exchange status. If the <reply> was not hit during the
- // invocation, then we will be in the "REQUEST" phase which means that either this was a one-way
+ // Now we have to update our message exchange status. If the <reply>
+ // was not hit during the
+ // invocation, then we will be in the "REQUEST" phase which means
+ // that either this was a one-way
// or a two-way that needs to delivery the reply asynchronously.
if (mex.getStatus() == Status.REQUEST) {
mex.setStatus(Status.ASYNC);
@@ -576,7 +588,8 @@
Set<OScope.CorrelationSet> csets = _plinkDef.getCorrelationSetsForOperation(operation);
for (OScope.CorrelationSet cset : csets) {
- CorrelationKey key = computeCorrelationKey(cset, _oprocess.messageTypes.get(msgDescription.getQName()), msg);
+ CorrelationKey key = computeCorrelationKey(cset, _oprocess.messageTypes.get(msgDescription.getQName()),
+ msg);
keys.add(key);
}
@@ -588,7 +601,8 @@
return keys.toArray(new CorrelationKey[keys.size()]);
}
- private CorrelationKey computeCorrelationKey(OScope.CorrelationSet cset, OMessageVarType messagetype, Element msg) {
+ private CorrelationKey computeCorrelationKey(OScope.CorrelationSet cset, OMessageVarType messagetype,
+ Element msg) {
String[] values = new String[cset.properties.size()];
int jIdx = 0;
@@ -599,8 +613,8 @@
if (alias == null) {
// TODO: Throw a real exception! And catch this at compile
// time.
- throw new IllegalArgumentException("No alias matching property '" + property.name + "' with message type '"
- + messagetype + "'");
+ throw new IllegalArgumentException("No alias matching property '" + property.name
+ + "' with message type '" + messagetype + "'");
}
String value;
@@ -635,7 +649,8 @@
__log.debug("Processing partner's response for partnerLink: " + messageExchange);
}
- BpelRuntimeContextImpl processInstance = createRuntimeContext(messageExchange.getDAO().getInstance(), null, null);
+ BpelRuntimeContextImpl processInstance = createRuntimeContext(messageExchange.getDAO().getInstance(), null,
+ null);
processInstance.invocationResponse(messageExchange);
processInstance.execute();
}
@@ -697,8 +712,8 @@
}
ProcessDAO getProcessDAO() {
- return _pconf.isTransient() ? _engine._contexts.inMemDao.getConnection().getProcess(_pid) : _engine._contexts.dao
- .getConnection().getProcess(_pid);
+ return _pconf.isTransient() ? _engine._contexts.inMemDao.getConnection().getProcess(_pid)
+ : _engine._contexts.dao.getConnection().getProcess(_pid);
}
static String genCorrelatorId(OPartnerLink plink, String opName) {
@@ -724,7 +739,8 @@
myrole._initialEPR = _engine._contexts.bindingContext.activateMyRoleEndpoint(_pid, myrole._endpoint,
myrole._plinkDef.myRolePortType);
- __log.debug("Activated " + _pid + " myrole " + myrole.getPartnerLinkName() + ": EPR is " + myrole._initialEPR);
+ __log.debug("Activated " + _pid + " myrole " + myrole.getPartnerLinkName() + ": EPR is "
+ + myrole._initialEPR);
}
for (PartnerLinkPartnerRoleImpl prole : _partnerRoles.values()) {
@@ -736,12 +752,13 @@
prole._initialEPR = epr;
}
- __log.debug("Activated " + _pid + " partnerrole " + prole.getPartnerLinkName() + ": EPR is " + prole._initialEPR);
+ __log.debug("Activated " + _pid + " partnerrole " + prole.getPartnerLinkName() + ": EPR is "
+ + prole._initialEPR);
}
__log.debug("Activated " + _pid);
-
+
markused();
}
@@ -805,8 +822,8 @@
public long getLastUsed() {
return _lastUsed;
}
-
- /** Keep track of the time the process was last used. */
+
+ /** Keep track of the time the process was last used. */
private final void markused() {
_lastUsed = System.currentTimeMillis();
}