You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by ms...@apache.org on 2007/01/10 18:28:23 UTC

svn commit: r494901 - /incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java

Author: mszefler
Date: Wed Jan 10 09:28:20 2007
New Revision: 494901

URL: http://svn.apache.org/viewvc?view=rev&rev=494901
Log:
Messages where the correlation keys were not found in the correct place were causing rollbacks. Fixed to cause message exchange failure. 

Modified:
    incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java

Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?view=diff&rev=494901&r1=494900&r2=494901
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java Wed Jan 10 09:28:20 2007
@@ -98,7 +98,8 @@
     /** Last time the process was used. */
     volatile long _lastUsed;
 
-    public BpelProcess(ProcessConf conf, OProcess oprocess, BpelEventListener debugger, ExpressionLanguageRuntimeRegistry expLangRuntimeRegistry) {
+    public BpelProcess(ProcessConf conf, OProcess oprocess, BpelEventListener debugger,
+            ExpressionLanguageRuntimeRegistry expLangRuntimeRegistry) {
         _pid = conf.getProcessId();
         _pconf = conf;
 
@@ -111,8 +112,8 @@
         for (Map.Entry<String, Endpoint> provide : conf.getProvideEndpoints().entrySet()) {
             OPartnerLink plink = oprocess.getPartnerLink(provide.getKey());
             if (plink == null) {
-                String errmsg = "Error in deployment descriptor for process " + _pid + "; reference to unknown partner link "
-                        + provide.getKey();
+                String errmsg = "Error in deployment descriptor for process " + _pid
+                        + "; reference to unknown partner link " + provide.getKey();
                 __log.error(errmsg);
                 throw new BpelEngineException(errmsg);
             }
@@ -124,13 +125,13 @@
         for (Map.Entry<String, Endpoint> invoke : conf.getInvokeEndpoints().entrySet()) {
             OPartnerLink plink = oprocess.getPartnerLink(invoke.getKey());
             if (plink == null) {
-                String errmsg = "Error in deployment descriptor for process " + _pid + "; reference to unknown partner link "
-                        + invoke.getKey();
+                String errmsg = "Error in deployment descriptor for process " + _pid
+                        + "; reference to unknown partner link " + invoke.getKey();
                 __log.error(errmsg);
                 throw new BpelEngineException(errmsg);
             }
-            __log.debug("Processing <invoke> element for process " + _pid + ": partnerlink " + invoke.getKey() + " --> "
-                    + invoke.getValue());
+            __log.debug("Processing <invoke> element for process " + _pid + ": partnerlink " + invoke.getKey()
+                    + " --> " + invoke.getValue());
 
             partnerRoleIntialValues.put(plink, invoke.getValue());
         }
@@ -146,7 +147,8 @@
             }
 
             if (pl.hasPartnerRole()) {
-                PartnerLinkPartnerRoleImpl partnerRole = new PartnerLinkPartnerRoleImpl(pl, conf.getInvokeEndpoints().get(pl.getName()));
+                PartnerLinkPartnerRoleImpl partnerRole = new PartnerLinkPartnerRoleImpl(pl, conf.getInvokeEndpoints()
+                        .get(pl.getName()));
                 _partnerRoles.put(pl, partnerRole);
             }
         }
@@ -156,7 +158,8 @@
         return "BpelProcess[" + _pid + "]";
     }
 
-    public void recoverActivity(ProcessInstanceDAO instanceDAO, String channel, long activityId, String action, FaultData fault) {
+    public void recoverActivity(ProcessInstanceDAO instanceDAO, String channel, long activityId, String action,
+            FaultData fault) {
         if (__log.isDebugEnabled())
             __log.debug("Recovering activity in process " + instanceDAO.getInstanceId() + " with action " + action);
 
@@ -216,7 +219,8 @@
         if (target != null) {
             mex.setPortOp(target._plinkDef.myRolePortType, target._plinkDef.getMyRoleOperation(mex.getOperationName()));
         } else {
-            __log.warn("Couldn't find endpoint from service " + mex.getServiceName() + " when initializing a myRole mex.");
+            __log.warn("Couldn't find endpoint from service " + mex.getServiceName()
+                    + " when initializing a myRole mex.");
         }
     }
 
@@ -271,15 +275,18 @@
     }
 
     /**
-     * Get the element name for a given WSDL part. If the part is an <em>element</em> part, the name of that element is returned.
-     * If the part is an XML schema typed part, then the name of the part is returned in the null namespace.
+     * Get the element name for a given WSDL part. If the part is an
+     * <em>element</em> part, the name of that element is returned. If the
+     * part is an XML schema typed part, then the name of the part is returned
+     * in the null namespace.
      * 
      * @param part
      *            WSDL {@link javax.wsdl.Part}
      * @return name of element containing said part
      */
     static QName getElementNameForPart(OMessageVarType.Part part) {
-        return (part.type instanceof OElementVarType) ? ((OElementVarType) part.type).elementType : new QName(null, part.name);
+        return (part.type instanceof OElementVarType) ? ((OElementVarType) part.type).elementType : new QName(null,
+                part.name);
     }
 
     /** Create a version-appropriate runtime context. */
@@ -293,10 +300,12 @@
      * 
      * @param mex
      *            message exchange
-     * @return <code>true</code> if execution should continue, <code>false</code> otherwise
+     * @return <code>true</code> if execution should continue,
+     *         <code>false</code> otherwise
      */
     private boolean processInterceptors(MyRoleMessageExchangeImpl mex, InterceptorInvoker invoker) {
-        InterceptorContextImpl ictx = new InterceptorContextImpl(_engine._contexts.dao.getConnection(), getProcessDAO(),_pconf);
+        InterceptorContextImpl ictx = new InterceptorContextImpl(_engine._contexts.dao.getConnection(),
+                getProcessDAO(), _pconf);
 
         for (MessageExchangeInterceptor i : _mexInterceptors)
             if (!mex.processInterceptor(i, mex, ictx, invoker))
@@ -310,7 +319,8 @@
     }
 
     /**
-     * Replacement object for serializtation of the {@link OBase} (compiled BPEL) objects in the JACOB VPU.
+     * Replacement object for serializtation of the {@link OBase} (compiled
+     * BPEL) objects in the JACOB VPU.
      */
     public static final class OBaseReplacementImpl implements Externalizable {
         private static final long serialVersionUID = 1L;
@@ -348,8 +358,8 @@
         }
 
         /**
-         * Get the initial value of this role's EPR. This value is obtained from the integration layer when the process is enabled
-         * on the server.
+         * Get the initial value of this role's EPR. This value is obtained from
+         * the integration layer when the process is enabled on the server.
          * 
          * @return initial epr
          */
@@ -389,7 +399,8 @@
          */
         public void invokeMyRole(MyRoleMessageExchangeImpl mex) {
             if (__log.isTraceEnabled()) {
-                __log.trace(ObjectPrinter.stringifyMethodEnter(this + ":inputMsgRcvd", new Object[] { "messageExchange", mex }));
+                __log.trace(ObjectPrinter.stringifyMethodEnter(this + ":inputMsgRcvd", new Object[] {
+                        "messageExchange", mex }));
             }
 
             Operation operation = getMyRoleOperation(mex.getOperationName());
@@ -398,7 +409,7 @@
                 mex.setFailure(FailureType.UNKNOWN_OPERATION, mex.getOperationName(), null);
                 return;
             }
-            
+
             mex.getDAO().setPartnerLinkModelId(_plinkDef.getId());
             mex.setPortOp(_plinkDef.myRolePortType, operation);
             mex.setPattern(operation.getOutput() == null ? MessageExchangePattern.REQUEST_ONLY
@@ -421,20 +432,26 @@
             MessageRouteDAO messageRoute = null;
 
             // We need to compute the correlation keys (based on the operation
-            // we can
-            // infer which correlation keys to compute - this is merely a set
+            // we can  infer which correlation keys to compute - this is merely a set
             // consisting of each correlationKey used in each correlation sets
-            // that is
-            // ever
-            // referenced in an <receive>/<onMessage> on this
+            // that is ever referenced in an <receive>/<onMessage> on this
             // partnerlink/operation.
-            keys = computeCorrelationKeys(mex);
+            try {
+                keys = computeCorrelationKeys(mex);
+            } catch (InvalidMessageException ime) {
+                // We'd like to do a graceful exit here, no sense in rolling back due to a 
+                // a message format problem. 
+                __log.debug("Unable to evaluate correlation keys, invalid message format. ",ime);
+                mex.setFailure(FailureType.FORMAT_ERROR, ime.getMessage(), null);
+                return;
+            }
 
             String mySessionId = mex.getProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID);
             String partnerSessionId = mex.getProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID);
             if (__log.isDebugEnabled()) {
-                __log.debug("INPUTMSG: " + correlatorId + ": MSG RCVD keys=" + ArrayUtils.makeCollection(HashSet.class, keys)
-                        + " mySessionId=" + mySessionId + " partnerSessionId=" + partnerSessionId);
+                __log.debug("INPUTMSG: " + correlatorId + ": MSG RCVD keys="
+                        + ArrayUtils.makeCollection(HashSet.class, keys) + " mySessionId=" + mySessionId
+                        + " partnerSessionId=" + partnerSessionId);
             }
 
             CorrelationKey matchedKey = null;
@@ -476,9 +493,8 @@
                 BpelRuntimeContextImpl instance = createRuntimeContext(newInstance, new PROCESS(_oprocess), mex);
 
                 // send process instance event
-                NewProcessInstanceEvent evt = new NewProcessInstanceEvent(
-                        new QName(_oprocess.targetNamespace, _oprocess.getName()), getProcessDAO().getProcessId(), newInstance
-                                .getInstanceId());
+                NewProcessInstanceEvent evt = new NewProcessInstanceEvent(new QName(_oprocess.targetNamespace,
+                        _oprocess.getName()), getProcessDAO().getProcessId(), newInstance.getInstanceId());
                 evt.setPortType(mex.getPortType().getQName());
                 evt.setOperation(operation.getName());
                 evt.setMexId(mex.getMessageExchangeId());
@@ -494,10 +510,6 @@
                             + messageRoute.getTargetInstance().getInstanceId());
                 }
 
-                // Attempt to acquire an instance-level lock.
-                // _lockManager.lock(messageRoute.getTargetInstance().getInstanceId(),
-                // 60, TimeUnit.SECONDS);
-
                 ProcessInstanceDAO instanceDao = messageRoute.getTargetInstance();
 
                 // Reload process instance for DAO.
@@ -505,14 +517,12 @@
                 instance.inputMsgMatch(messageRoute.getGroupId(), messageRoute.getIndex(), mex);
 
                 // Kill the route so some new message does not get routed to
-                // same
-                // process
-                // instance.
+                // same process instance.
                 correlator.removeRoutes(messageRoute.getGroupId(), instanceDao);
 
                 // send process instance event
-                CorrelationMatchEvent evt = new CorrelationMatchEvent(new QName(_oprocess.targetNamespace, _oprocess.getName()),
-                        getProcessDAO().getProcessId(), instanceDao.getInstanceId(), matchedKey);
+                CorrelationMatchEvent evt = new CorrelationMatchEvent(new QName(_oprocess.targetNamespace, _oprocess
+                        .getName()), getProcessDAO().getProcessId(), instanceDao.getInstanceId(), matchedKey);
                 evt.setPortType(mex.getPortType().getQName());
                 evt.setOperation(operation.getName());
                 evt.setMexId(mex.getMessageExchangeId());
@@ -537,8 +547,8 @@
 
                 } else {
                     // send event
-                    CorrelationNoMatchEvent evt = new CorrelationNoMatchEvent(mex.getPortType().getQName(), mex.getOperation()
-                            .getName(), mex.getMessageExchangeId(), keys);
+                    CorrelationNoMatchEvent evt = new CorrelationNoMatchEvent(mex.getPortType().getQName(), mex
+                            .getOperation().getName(), mex.getMessageExchangeId(), keys);
 
                     evt.setProcessId(getProcessDAO().getProcessId());
                     evt.setProcessName(new QName(_oprocess.targetNamespace, _oprocess.getName()));
@@ -552,8 +562,10 @@
                 }
             }
 
-            // Now we have to update our message exchange status. If the <reply> was not hit during the
-            // invocation, then we will be in the "REQUEST" phase which means that either this was a one-way
+            // Now we have to update our message exchange status. If the <reply>
+            // was not hit during the
+            // invocation, then we will be in the "REQUEST" phase which means
+            // that either this was a one-way
             // or a two-way that needs to delivery the reply asynchronously.
             if (mex.getStatus() == Status.REQUEST) {
                 mex.setStatus(Status.ASYNC);
@@ -576,7 +588,8 @@
             Set<OScope.CorrelationSet> csets = _plinkDef.getCorrelationSetsForOperation(operation);
 
             for (OScope.CorrelationSet cset : csets) {
-                CorrelationKey key = computeCorrelationKey(cset, _oprocess.messageTypes.get(msgDescription.getQName()), msg);
+                CorrelationKey key = computeCorrelationKey(cset, _oprocess.messageTypes.get(msgDescription.getQName()),
+                        msg);
                 keys.add(key);
             }
 
@@ -588,7 +601,8 @@
             return keys.toArray(new CorrelationKey[keys.size()]);
         }
 
-        private CorrelationKey computeCorrelationKey(OScope.CorrelationSet cset, OMessageVarType messagetype, Element msg) {
+        private CorrelationKey computeCorrelationKey(OScope.CorrelationSet cset, OMessageVarType messagetype,
+                Element msg) {
             String[] values = new String[cset.properties.size()];
 
             int jIdx = 0;
@@ -599,8 +613,8 @@
                 if (alias == null) {
                     // TODO: Throw a real exception! And catch this at compile
                     // time.
-                    throw new IllegalArgumentException("No alias matching property '" + property.name + "' with message type '"
-                            + messagetype + "'");
+                    throw new IllegalArgumentException("No alias matching property '" + property.name
+                            + "' with message type '" + messagetype + "'");
                 }
 
                 String value;
@@ -635,7 +649,8 @@
                 __log.debug("Processing partner's response for partnerLink: " + messageExchange);
             }
 
-            BpelRuntimeContextImpl processInstance = createRuntimeContext(messageExchange.getDAO().getInstance(), null, null);
+            BpelRuntimeContextImpl processInstance = createRuntimeContext(messageExchange.getDAO().getInstance(), null,
+                    null);
             processInstance.invocationResponse(messageExchange);
             processInstance.execute();
         }
@@ -697,8 +712,8 @@
     }
 
     ProcessDAO getProcessDAO() {
-        return _pconf.isTransient() ? _engine._contexts.inMemDao.getConnection().getProcess(_pid) : _engine._contexts.dao
-                .getConnection().getProcess(_pid);
+        return _pconf.isTransient() ? _engine._contexts.inMemDao.getConnection().getProcess(_pid)
+                : _engine._contexts.dao.getConnection().getProcess(_pid);
     }
 
     static String genCorrelatorId(OPartnerLink plink, String opName) {
@@ -724,7 +739,8 @@
             myrole._initialEPR = _engine._contexts.bindingContext.activateMyRoleEndpoint(_pid, myrole._endpoint,
                     myrole._plinkDef.myRolePortType);
 
-            __log.debug("Activated " + _pid + " myrole " + myrole.getPartnerLinkName() + ": EPR is " + myrole._initialEPR);
+            __log.debug("Activated " + _pid + " myrole " + myrole.getPartnerLinkName() + ": EPR is "
+                    + myrole._initialEPR);
         }
 
         for (PartnerLinkPartnerRoleImpl prole : _partnerRoles.values()) {
@@ -736,12 +752,13 @@
                 prole._initialEPR = epr;
             }
 
-            __log.debug("Activated " + _pid + " partnerrole " + prole.getPartnerLinkName() + ": EPR is " + prole._initialEPR);
+            __log.debug("Activated " + _pid + " partnerrole " + prole.getPartnerLinkName() + ": EPR is "
+                    + prole._initialEPR);
 
         }
 
         __log.debug("Activated " + _pid);
-        
+
         markused();
     }
 
@@ -805,8 +822,8 @@
     public long getLastUsed() {
         return _lastUsed;
     }
-    
-    /** Keep track of the time the process was last used. */ 
+
+    /** Keep track of the time the process was last used. */
     private final void markused() {
         _lastUsed = System.currentTimeMillis();
     }