You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by rr...@apache.org on 2009/09/09 21:16:52 UTC

svn commit: r813084 - in /ode/branches/APACHE_ODE_1.X: bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ bpel-schemas/src/main/xsd/

Author: rr
Date: Wed Sep  9 19:16:51 2009
New Revision: 813084

URL: http://svn.apache.org/viewvc?rev=813084&view=rev
Log:
ODE-483: Instance replayer - added failures dispatching + bugfix for replaying multiple instances at once

Modified:
    ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/Replayer.java
    ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerBpelRuntimeContextImpl.java
    ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerContext.java
    ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerScheduler.java
    ode/branches/APACHE_ODE_1.X/bpel-schemas/src/main/xsd/pmapi.xsd

Modified: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/Replayer.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/Replayer.java?rev=813084&r1=813083&r2=813084&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/Replayer.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/Replayer.java Wed Sep  9 19:16:51 2009
@@ -34,6 +34,7 @@
 import org.apache.ode.bpel.dao.ProcessInstanceDAO;
 import org.apache.ode.bpel.engine.BpelEngineImpl;
 import org.apache.ode.bpel.iapi.BpelEngine;
+import org.apache.ode.bpel.iapi.MessageExchange.Status;
 import org.apache.ode.bpel.iapi.ProcessConf.CLEANUP_CATEGORY;
 import org.apache.ode.bpel.pmapi.CommunicationType;
 import org.apache.ode.bpel.pmapi.ExchangeType;
@@ -125,6 +126,8 @@
         CommunicationType result = CommunicationType.Factory.newInstance();
         List<Exchange> list = new ArrayList<Exchange>();
         ProcessInstanceDAO instance = conn.getInstance(iid);
+        if (instance == null)
+            return result;
         result.setProcessType(instance.getProcess().getType());
 
         for (String mexId : instance.getMessageExchangeIds()) {
@@ -140,13 +143,18 @@
                 __log.error("", e1);
             }
             try {
-                if (mexDao.getResponse() != null) {
-                    if ("FAULT".equals(mexDao.getStatus())) {
-                        Fault f = e.addNewFault();
-                        f.setType(mexDao.getFault());
-                        f.setExplanation(mexDao.getFaultExplanation());
+                Status status = Status.valueOf(mexDao.getStatus());
+                if (status == Status.FAULT) {
+                    Fault f = e.addNewFault();
+                    f.setType(mexDao.getFault());
+                    f.setExplanation(mexDao.getFaultExplanation());
+                    if (mexDao.getResponse() != null) {
                         f.setMessage(XmlObject.Factory.parse(mexDao.getResponse().getData()));
-                    } else {
+                    }
+                } else if (status == Status.FAILURE) {
+                    e.addNewFailure().setExplanation(mexDao.getFaultExplanation());
+                } else {
+                    if (mexDao.getResponse() != null) {
                         e.setOut(XmlObject.Factory.parse(mexDao.getResponse().getData()));
                     }
                 }

Modified: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerBpelRuntimeContextImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerBpelRuntimeContextImpl.java?rev=813084&r1=813083&r2=813084&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerBpelRuntimeContextImpl.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerBpelRuntimeContextImpl.java Wed Sep  9 19:16:51 2009
@@ -85,7 +85,7 @@
     public String invoke(int aid, PartnerLinkInstance partnerLink, Operation operation, Element outgoingMessage, InvokeResponseChannel channel) throws FaultException {
         __log.debug("invoke");
 
-        Exchange answer = replayerContext.answers.fetchAnswer(partnerLink.partnerLink.partnerRolePortType.getQName(), operation.getName());
+        Exchange answer = replayerContext.answers.fetchAnswer(partnerLink.partnerLink.partnerRolePortType.getQName(), operation.getName(), outgoingMessage, getCurrentEventDateTime());
 
         PartnerLinkDAO plinkDAO = fetchPartnerLinkDAO(partnerLink);
 
@@ -127,6 +127,9 @@
                 }
                 mexDao.setResponse(response);
                 mexDao.setStatus(Status.RESPONSE.toString());
+            } else if (answer.isSetFailure()) {
+                mexDao.setFaultExplanation(answer.getFailure().getExplanation());
+                mexDao.setStatus(Status.FAILURE.toString());
             } else {
                 // We don't have output for in-out operation - resulting with
                 // replayer error to the top
@@ -188,7 +191,7 @@
         MessageDAO message = mex.createMessage(plinkInstnace.partnerLink.getMyRoleOperation(opName).getOutput().getMessage().getQName());
         buildOutgoingMessage(message, msg);
 
-        __log.debug("reply mexRef:" + mexRef);
+        __log.debug("instance replied mexRef:" + mexRef + " " + DOMUtils.domToString(msg));
         mex.setResponse(message);
         mex.setStatus(Status.RESPONSE.toString());
     }
@@ -249,6 +252,7 @@
                     // Kill the route so some new message does not get routed to
                     // same process instance.
                     routing.correlator.removeRoutes(routing.messageRoute.getGroupId(), _dao);
+
                     execute();
                     return true;
                 }

Modified: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerContext.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerContext.java?rev=813084&r1=813083&r2=813084&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerContext.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerContext.java Wed Sep  9 19:16:51 2009
@@ -41,6 +41,8 @@
 import org.apache.ode.bpel.pmapi.ExchangeType;
 import org.apache.ode.bpel.pmapi.CommunicationType.Exchange;
 import org.apache.ode.bpel.runtime.PROCESS;
+import org.apache.ode.utils.DOMUtils;
+import org.w3c.dom.Element;
 
 /**
  * Context holding replayer state (eg. invoke answers) for single instance during replaying.
@@ -77,14 +79,14 @@
             v.answers.add(e);
         }
 
-        public Exchange fetchAnswer(QName service, String operation) {
+        public Exchange fetchAnswer(QName service, String operation, Element outgointMessage, Date currentEventDateTime) {
             __log.debug("fetching answer for " + service + " " + operation);
             String key = getAnswersKey(service, operation);
             AnswersForKey v = answersMap.get(key);
-            if (v == null) {
-                throw new IllegalStateException("answer for " + service + " " + operation + " not found");
+            Exchange e = v == null ? null : v.answerPos < v.answers.size() ? v.answers.get(v.answerPos) : null;
+            if (e == null) {
+                throw new IllegalStateException("answer for " + service + " " + operation + " at time " + currentEventDateTime + " not found, outgoing message was " + DOMUtils.domToString(outgointMessage));
             }
-            Exchange e = v.answers.get(v.answerPos);
             v.answerPos++;
             __log.debug("fetched " + e);
             return e;
@@ -129,50 +131,67 @@
         }, time, runtimeContext);
     }
 
-    public void init(CommunicationType r, ReplayerScheduler scheduler) throws Exception {
+    public void init(final CommunicationType r, ReplayerScheduler scheduler) throws Exception {
         this.scheduler = scheduler;
-        List<Exchange> exchangeList = r.getExchangeList();
+        final List<Exchange> exchangeList = r.getExchangeList();
 
         for (int i = 1; i < exchangeList.size(); i++) {
             Exchange e = exchangeList.get(i);
-            if (e.getType() == ExchangeType.P) {
+            // We skip failures, because INVOKE_CHECK job is not handled by
+            // replayer
+            if (e.getType() == ExchangeType.P && !e.isSetFailure()) {
                 answers.add(e);
             }
         }
 
         {
             final Exchange e = exchangeList.get(0);
-            final BpelProcess p = bpelEngine.getNewestProcessByType(r.getProcessType());
-            final ProcessDAO processDAO = p.getProcessDAO();
-            final MyRoleMessageExchangeImpl mex = ReplayerBpelRuntimeContextImpl.createMyRoleMex(e, bpelEngine);
-
-            p.invokeProcess(mex, new BpelProcess.InvokeHandler() {
-                public boolean invoke(PartnerLinkMyRoleImpl target, RoutingInfo routing, boolean createInstance) {
-                    if (routing.messageRoute == null && createInstance) {
-                        ProcessInstanceDAO newInstance = processDAO.createInstance(routing.correlator);
-
-                        runtimeContext = new ReplayerBpelRuntimeContextImpl(p, newInstance, new PROCESS(p.getOProcess()), mex, ReplayerContext.this);
-                        runtimeContext.setCurrentEventDateTime(e.getCreateTime().getTime());
-                        runtimeContext.updateMyRoleMex(mex);
-                        // first receive is matched to provided mex
-                        runtimeContext.execute();
-                        return true;
-                    } else if (routing.messageRoute != null) {
-                        throw new IllegalStateException("Instantiating mex causes invocation of existing instance " + mex);
+
+            final Date time = e.getCreateTime().getTime();
+            scheduler.scheduleReplayerJob(new Callable<Void>() {
+                public Void call() throws Exception {
+                    __log.debug("initial call " + e);
+
+                    final BpelProcess p = bpelEngine.getNewestProcessByType(r.getProcessType());
+                    final ProcessDAO processDAO = p.getProcessDAO();
+                    final MyRoleMessageExchangeImpl mex = ReplayerBpelRuntimeContextImpl.createMyRoleMex(e, bpelEngine);
+
+                    p.invokeProcess(mex,
+                    // time,
+                            new BpelProcess.InvokeHandler() {
+                                public boolean invoke(PartnerLinkMyRoleImpl target, RoutingInfo routing, boolean createInstance) {
+                                    if (routing.messageRoute == null && createInstance) {
+                                        ProcessInstanceDAO newInstance = processDAO.createInstance(routing.correlator);
+
+                                        runtimeContext = new ReplayerBpelRuntimeContextImpl(p, newInstance, new PROCESS(p.getOProcess()), mex,
+                                        // time,
+                                                ReplayerContext.this);
+                                        runtimeContext.setCurrentEventDateTime(time);
+                                        runtimeContext.updateMyRoleMex(mex);
+                                        // first receive is matched to provided
+                                        // mex
+                                        runtimeContext.execute();
+                                        return true;
+                                    } else if (routing.messageRoute != null) {
+                                        throw new IllegalStateException("Instantiating mex causes invocation of existing instance " + mex);
+                                    }
+                                    return false;
+                                }
+                            });
+
+                    for (int i = 1; i < exchangeList.size(); i++) {
+                        Exchange e2 = exchangeList.get(i);
+                        if (e2.getType() == ExchangeType.M) {
+                            MyRoleMessageExchangeImpl mex2 = ReplayerBpelRuntimeContextImpl.createMyRoleMex(e2, bpelEngine);
+                            runtimeContext.updateMyRoleMex(mex2);
+                            scheduleInvoke(e2, mex2);
+                        }
                     }
-                    return false;
+                    return null;
                 }
-            });
+            }, time, null);
         }
 
-        for (int i = 1; i < exchangeList.size(); i++) {
-            Exchange e = exchangeList.get(i);
-            if (e.getType() == ExchangeType.M) {
-                MyRoleMessageExchangeImpl mex = ReplayerBpelRuntimeContextImpl.createMyRoleMex(e, bpelEngine);
-                runtimeContext.updateMyRoleMex(mex);
-                scheduleInvoke(e, mex);
-            }
-        }
     }
 
     public void run() throws Exception {

Modified: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerScheduler.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerScheduler.java?rev=813084&r1=813083&r2=813084&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerScheduler.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerScheduler.java Wed Sep  9 19:16:51 2009
@@ -92,7 +92,9 @@
         while (!taskQueue.isEmpty()) {
             TaskElement taskElement = taskQueue.remove();
             __log.debug("executing action at time " + taskElement.when);
-            taskElement.runtimeContext.setCurrentEventDateTime(taskElement.when);
+            if (taskElement.runtimeContext != null) {
+                taskElement.runtimeContext.setCurrentEventDateTime(taskElement.when);
+            }
             taskElement.action.call();
         }
     }

Modified: ode/branches/APACHE_ODE_1.X/bpel-schemas/src/main/xsd/pmapi.xsd
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-schemas/src/main/xsd/pmapi.xsd?rev=813084&r1=813083&r2=813084&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-schemas/src/main/xsd/pmapi.xsd (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-schemas/src/main/xsd/pmapi.xsd Wed Sep  9 19:16:51 2009
@@ -48,65 +48,73 @@
     </simpleType>
     
     <complexType name="GetCommunication">
-    	<xs:sequence>
-    		<xs:element name="iid" type="long" maxOccurs="unbounded"></xs:element>
-    	</xs:sequence>
+        <xs:sequence>
+            <xs:element name="iid" type="long" maxOccurs="unbounded"></xs:element>
+        </xs:sequence>
     </complexType>
     
-   	<xs:complexType name="GetCommunicationResponse">
-   		<xs:sequence>
-        	<element name="restoreInstance" minOccurs="0" maxOccurs="unbounded" type="pmapi:CommunicationType"/>
-   		</xs:sequence>
-   	</xs:complexType>
+    <xs:complexType name="GetCommunicationResponse">
+        <xs:sequence>
+            <element name="restoreInstance" minOccurs="0" maxOccurs="unbounded" type="pmapi:CommunicationType"/>
+        </xs:sequence>
+    </xs:complexType>
 
     <element name="getCommunicationResponse" type="pmapi:GetCommunicationResponse"/>
 
     <complexType name="CommunicationType">
-    	<sequence>
-    		<element name="processType" type="QName" />
+        <sequence>
+            <element name="processType" type="QName" />
 
-    		<element name="serviceConfig" maxOccurs="unbounded">
-    			<complexType>
-    				<sequence>
-    					<element name="name" type="QName" />
-    					<element name="replayingType"
-    						type="pmapi:ReplayType" />
-    				</sequence>
-    			</complexType>
-    		</element>
+            <element name="serviceConfig" maxOccurs="unbounded">
+                <complexType>
+                    <sequence>
+                        <element name="name" type="QName" />
+                        <element name="replayingType"
+                            type="pmapi:ReplayType" />
+                    </sequence>
+                </complexType>
+            </element>
 
-    		<element name="exchange" maxOccurs="unbounded">
-    			<complexType>
-    				<sequence>
-    					<element name="type" type="pmapi:ExchangeType" />
-    					<element name="createTime" type="dateTime" />
-    					<element name="service" type="QName" />
-    					<element name="operation" type="string" />
-    					<element name="in" type="anyType" />
-    					<element name="out" type="anyType"
-    						minOccurs="0" />
-    					<element name="fault" minOccurs="0">
-    						<complexType>
-    							<sequence>
-    								<element name="type" type="QName" />
-    								<element name="explanation"
-    									type="string" />
-    								<element name="message"
-    									type="anyType" />
-    							</sequence>
-    						</complexType>
-    					</element>
-    				</sequence>
-    			</complexType>
-    		</element>
-    	</sequence>
+            <element name="exchange" maxOccurs="unbounded">
+                <complexType>
+                    <sequence>
+                        <element name="type" type="pmapi:ExchangeType" />
+                        <element name="createTime" type="dateTime" />
+                        <element name="service" type="QName" />
+                        <element name="operation" type="string" />
+                        <element name="in" type="anyType" />
+                        <choice>
+                            <element name="out" type="anyType"/>
+                            <element name="fault">
+                                <complexType>
+                                    <sequence>
+                                        <element name="type" type="QName" />
+                                        <element name="explanation"
+                                            type="string" />
+                                        <element name="message"
+                                            type="anyType" />
+                                    </sequence>
+                                </complexType>
+                            </element>
+                            <element name="failure">
+                                <complexType>
+                                    <sequence>
+                                        <element name="explanation" type="string" />
+                                    </sequence>
+                                </complexType>
+                           </element>
+                        </choice>
+                    </sequence>
+                </complexType>
+            </element>
+        </sequence>
     </complexType>
 
     <complexType name="Replay">
         <sequence>
-        	<element name="upgradeInstance" minOccurs="0" maxOccurs="unbounded" type="long"/>
-        	<element name="replaceInstance" minOccurs="0" maxOccurs="unbounded" type="long"/>
-        	<element name="restoreInstance" minOccurs="0" maxOccurs="unbounded" type="pmapi:CommunicationType"/>
+            <element name="upgradeInstance" minOccurs="0" maxOccurs="unbounded" type="long"/>
+            <element name="replaceInstance" minOccurs="0" maxOccurs="unbounded" type="long"/>
+            <element name="restoreInstance" minOccurs="0" maxOccurs="unbounded" type="pmapi:CommunicationType"/>
         </sequence>
     </complexType>