You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by rr...@apache.org on 2009/09/09 21:16:52 UTC
svn commit: r813084 - in /ode/branches/APACHE_ODE_1.X:
bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/
bpel-schemas/src/main/xsd/
Author: rr
Date: Wed Sep 9 19:16:51 2009
New Revision: 813084
URL: http://svn.apache.org/viewvc?rev=813084&view=rev
Log:
ODE-483: Instance replayer - added failures dispatching + bugfix for replaying multiple instances at once
Modified:
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/Replayer.java
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerBpelRuntimeContextImpl.java
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerContext.java
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerScheduler.java
ode/branches/APACHE_ODE_1.X/bpel-schemas/src/main/xsd/pmapi.xsd
Modified: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/Replayer.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/Replayer.java?rev=813084&r1=813083&r2=813084&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/Replayer.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/Replayer.java Wed Sep 9 19:16:51 2009
@@ -34,6 +34,7 @@
import org.apache.ode.bpel.dao.ProcessInstanceDAO;
import org.apache.ode.bpel.engine.BpelEngineImpl;
import org.apache.ode.bpel.iapi.BpelEngine;
+import org.apache.ode.bpel.iapi.MessageExchange.Status;
import org.apache.ode.bpel.iapi.ProcessConf.CLEANUP_CATEGORY;
import org.apache.ode.bpel.pmapi.CommunicationType;
import org.apache.ode.bpel.pmapi.ExchangeType;
@@ -125,6 +126,8 @@
CommunicationType result = CommunicationType.Factory.newInstance();
List<Exchange> list = new ArrayList<Exchange>();
ProcessInstanceDAO instance = conn.getInstance(iid);
+ if (instance == null)
+ return result;
result.setProcessType(instance.getProcess().getType());
for (String mexId : instance.getMessageExchangeIds()) {
@@ -140,13 +143,18 @@
__log.error("", e1);
}
try {
- if (mexDao.getResponse() != null) {
- if ("FAULT".equals(mexDao.getStatus())) {
- Fault f = e.addNewFault();
- f.setType(mexDao.getFault());
- f.setExplanation(mexDao.getFaultExplanation());
+ Status status = Status.valueOf(mexDao.getStatus());
+ if (status == Status.FAULT) {
+ Fault f = e.addNewFault();
+ f.setType(mexDao.getFault());
+ f.setExplanation(mexDao.getFaultExplanation());
+ if (mexDao.getResponse() != null) {
f.setMessage(XmlObject.Factory.parse(mexDao.getResponse().getData()));
- } else {
+ }
+ } else if (status == Status.FAILURE) {
+ e.addNewFailure().setExplanation(mexDao.getFaultExplanation());
+ } else {
+ if (mexDao.getResponse() != null) {
e.setOut(XmlObject.Factory.parse(mexDao.getResponse().getData()));
}
}
Modified: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerBpelRuntimeContextImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerBpelRuntimeContextImpl.java?rev=813084&r1=813083&r2=813084&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerBpelRuntimeContextImpl.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerBpelRuntimeContextImpl.java Wed Sep 9 19:16:51 2009
@@ -85,7 +85,7 @@
public String invoke(int aid, PartnerLinkInstance partnerLink, Operation operation, Element outgoingMessage, InvokeResponseChannel channel) throws FaultException {
__log.debug("invoke");
- Exchange answer = replayerContext.answers.fetchAnswer(partnerLink.partnerLink.partnerRolePortType.getQName(), operation.getName());
+ Exchange answer = replayerContext.answers.fetchAnswer(partnerLink.partnerLink.partnerRolePortType.getQName(), operation.getName(), outgoingMessage, getCurrentEventDateTime());
PartnerLinkDAO plinkDAO = fetchPartnerLinkDAO(partnerLink);
@@ -127,6 +127,9 @@
}
mexDao.setResponse(response);
mexDao.setStatus(Status.RESPONSE.toString());
+ } else if (answer.isSetFailure()) {
+ mexDao.setFaultExplanation(answer.getFailure().getExplanation());
+ mexDao.setStatus(Status.FAILURE.toString());
} else {
// We don't have output for in-out operation - resulting with
// replayer error to the top
@@ -188,7 +191,7 @@
MessageDAO message = mex.createMessage(plinkInstnace.partnerLink.getMyRoleOperation(opName).getOutput().getMessage().getQName());
buildOutgoingMessage(message, msg);
- __log.debug("reply mexRef:" + mexRef);
+ __log.debug("instance replied mexRef:" + mexRef + " " + DOMUtils.domToString(msg));
mex.setResponse(message);
mex.setStatus(Status.RESPONSE.toString());
}
@@ -249,6 +252,7 @@
// Kill the route so some new message does not get routed to
// same process instance.
routing.correlator.removeRoutes(routing.messageRoute.getGroupId(), _dao);
+
execute();
return true;
}
Modified: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerContext.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerContext.java?rev=813084&r1=813083&r2=813084&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerContext.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerContext.java Wed Sep 9 19:16:51 2009
@@ -41,6 +41,8 @@
import org.apache.ode.bpel.pmapi.ExchangeType;
import org.apache.ode.bpel.pmapi.CommunicationType.Exchange;
import org.apache.ode.bpel.runtime.PROCESS;
+import org.apache.ode.utils.DOMUtils;
+import org.w3c.dom.Element;
/**
* Context holding replayer state (eg. invoke answers) for single instance during replaying.
@@ -77,14 +79,14 @@
v.answers.add(e);
}
- public Exchange fetchAnswer(QName service, String operation) {
+ public Exchange fetchAnswer(QName service, String operation, Element outgointMessage, Date currentEventDateTime) {
__log.debug("fetching answer for " + service + " " + operation);
String key = getAnswersKey(service, operation);
AnswersForKey v = answersMap.get(key);
- if (v == null) {
- throw new IllegalStateException("answer for " + service + " " + operation + " not found");
+ Exchange e = v == null ? null : v.answerPos < v.answers.size() ? v.answers.get(v.answerPos) : null;
+ if (e == null) {
+ throw new IllegalStateException("answer for " + service + " " + operation + " at time " + currentEventDateTime + " not found, outgoing message was " + DOMUtils.domToString(outgointMessage));
}
- Exchange e = v.answers.get(v.answerPos);
v.answerPos++;
__log.debug("fetched " + e);
return e;
@@ -129,50 +131,67 @@
}, time, runtimeContext);
}
- public void init(CommunicationType r, ReplayerScheduler scheduler) throws Exception {
+ public void init(final CommunicationType r, ReplayerScheduler scheduler) throws Exception {
this.scheduler = scheduler;
- List<Exchange> exchangeList = r.getExchangeList();
+ final List<Exchange> exchangeList = r.getExchangeList();
for (int i = 1; i < exchangeList.size(); i++) {
Exchange e = exchangeList.get(i);
- if (e.getType() == ExchangeType.P) {
+ // We skip failures, because INVOKE_CHECK job is not handled by
+ // replayer
+ if (e.getType() == ExchangeType.P && !e.isSetFailure()) {
answers.add(e);
}
}
{
final Exchange e = exchangeList.get(0);
- final BpelProcess p = bpelEngine.getNewestProcessByType(r.getProcessType());
- final ProcessDAO processDAO = p.getProcessDAO();
- final MyRoleMessageExchangeImpl mex = ReplayerBpelRuntimeContextImpl.createMyRoleMex(e, bpelEngine);
-
- p.invokeProcess(mex, new BpelProcess.InvokeHandler() {
- public boolean invoke(PartnerLinkMyRoleImpl target, RoutingInfo routing, boolean createInstance) {
- if (routing.messageRoute == null && createInstance) {
- ProcessInstanceDAO newInstance = processDAO.createInstance(routing.correlator);
-
- runtimeContext = new ReplayerBpelRuntimeContextImpl(p, newInstance, new PROCESS(p.getOProcess()), mex, ReplayerContext.this);
- runtimeContext.setCurrentEventDateTime(e.getCreateTime().getTime());
- runtimeContext.updateMyRoleMex(mex);
- // first receive is matched to provided mex
- runtimeContext.execute();
- return true;
- } else if (routing.messageRoute != null) {
- throw new IllegalStateException("Instantiating mex causes invocation of existing instance " + mex);
+
+ final Date time = e.getCreateTime().getTime();
+ scheduler.scheduleReplayerJob(new Callable<Void>() {
+ public Void call() throws Exception {
+ __log.debug("initial call " + e);
+
+ final BpelProcess p = bpelEngine.getNewestProcessByType(r.getProcessType());
+ final ProcessDAO processDAO = p.getProcessDAO();
+ final MyRoleMessageExchangeImpl mex = ReplayerBpelRuntimeContextImpl.createMyRoleMex(e, bpelEngine);
+
+ p.invokeProcess(mex,
+ // time,
+ new BpelProcess.InvokeHandler() {
+ public boolean invoke(PartnerLinkMyRoleImpl target, RoutingInfo routing, boolean createInstance) {
+ if (routing.messageRoute == null && createInstance) {
+ ProcessInstanceDAO newInstance = processDAO.createInstance(routing.correlator);
+
+ runtimeContext = new ReplayerBpelRuntimeContextImpl(p, newInstance, new PROCESS(p.getOProcess()), mex,
+ // time,
+ ReplayerContext.this);
+ runtimeContext.setCurrentEventDateTime(time);
+ runtimeContext.updateMyRoleMex(mex);
+ // first receive is matched to provided
+ // mex
+ runtimeContext.execute();
+ return true;
+ } else if (routing.messageRoute != null) {
+ throw new IllegalStateException("Instantiating mex causes invocation of existing instance " + mex);
+ }
+ return false;
+ }
+ });
+
+ for (int i = 1; i < exchangeList.size(); i++) {
+ Exchange e2 = exchangeList.get(i);
+ if (e2.getType() == ExchangeType.M) {
+ MyRoleMessageExchangeImpl mex2 = ReplayerBpelRuntimeContextImpl.createMyRoleMex(e2, bpelEngine);
+ runtimeContext.updateMyRoleMex(mex2);
+ scheduleInvoke(e2, mex2);
+ }
}
- return false;
+ return null;
}
- });
+ }, time, null);
}
- for (int i = 1; i < exchangeList.size(); i++) {
- Exchange e = exchangeList.get(i);
- if (e.getType() == ExchangeType.M) {
- MyRoleMessageExchangeImpl mex = ReplayerBpelRuntimeContextImpl.createMyRoleMex(e, bpelEngine);
- runtimeContext.updateMyRoleMex(mex);
- scheduleInvoke(e, mex);
- }
- }
}
public void run() throws Exception {
Modified: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerScheduler.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerScheduler.java?rev=813084&r1=813083&r2=813084&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerScheduler.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerScheduler.java Wed Sep 9 19:16:51 2009
@@ -92,7 +92,9 @@
while (!taskQueue.isEmpty()) {
TaskElement taskElement = taskQueue.remove();
__log.debug("executing action at time " + taskElement.when);
- taskElement.runtimeContext.setCurrentEventDateTime(taskElement.when);
+ if (taskElement.runtimeContext != null) {
+ taskElement.runtimeContext.setCurrentEventDateTime(taskElement.when);
+ }
taskElement.action.call();
}
}
Modified: ode/branches/APACHE_ODE_1.X/bpel-schemas/src/main/xsd/pmapi.xsd
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-schemas/src/main/xsd/pmapi.xsd?rev=813084&r1=813083&r2=813084&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-schemas/src/main/xsd/pmapi.xsd (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-schemas/src/main/xsd/pmapi.xsd Wed Sep 9 19:16:51 2009
@@ -48,65 +48,73 @@
</simpleType>
<complexType name="GetCommunication">
- <xs:sequence>
- <xs:element name="iid" type="long" maxOccurs="unbounded"></xs:element>
- </xs:sequence>
+ <xs:sequence>
+ <xs:element name="iid" type="long" maxOccurs="unbounded"></xs:element>
+ </xs:sequence>
</complexType>
- <xs:complexType name="GetCommunicationResponse">
- <xs:sequence>
- <element name="restoreInstance" minOccurs="0" maxOccurs="unbounded" type="pmapi:CommunicationType"/>
- </xs:sequence>
- </xs:complexType>
+ <xs:complexType name="GetCommunicationResponse">
+ <xs:sequence>
+ <element name="restoreInstance" minOccurs="0" maxOccurs="unbounded" type="pmapi:CommunicationType"/>
+ </xs:sequence>
+ </xs:complexType>
<element name="getCommunicationResponse" type="pmapi:GetCommunicationResponse"/>
<complexType name="CommunicationType">
- <sequence>
- <element name="processType" type="QName" />
+ <sequence>
+ <element name="processType" type="QName" />
- <element name="serviceConfig" maxOccurs="unbounded">
- <complexType>
- <sequence>
- <element name="name" type="QName" />
- <element name="replayingType"
- type="pmapi:ReplayType" />
- </sequence>
- </complexType>
- </element>
+ <element name="serviceConfig" maxOccurs="unbounded">
+ <complexType>
+ <sequence>
+ <element name="name" type="QName" />
+ <element name="replayingType"
+ type="pmapi:ReplayType" />
+ </sequence>
+ </complexType>
+ </element>
- <element name="exchange" maxOccurs="unbounded">
- <complexType>
- <sequence>
- <element name="type" type="pmapi:ExchangeType" />
- <element name="createTime" type="dateTime" />
- <element name="service" type="QName" />
- <element name="operation" type="string" />
- <element name="in" type="anyType" />
- <element name="out" type="anyType"
- minOccurs="0" />
- <element name="fault" minOccurs="0">
- <complexType>
- <sequence>
- <element name="type" type="QName" />
- <element name="explanation"
- type="string" />
- <element name="message"
- type="anyType" />
- </sequence>
- </complexType>
- </element>
- </sequence>
- </complexType>
- </element>
- </sequence>
+ <element name="exchange" maxOccurs="unbounded">
+ <complexType>
+ <sequence>
+ <element name="type" type="pmapi:ExchangeType" />
+ <element name="createTime" type="dateTime" />
+ <element name="service" type="QName" />
+ <element name="operation" type="string" />
+ <element name="in" type="anyType" />
+ <choice>
+ <element name="out" type="anyType"/>
+ <element name="fault">
+ <complexType>
+ <sequence>
+ <element name="type" type="QName" />
+ <element name="explanation"
+ type="string" />
+ <element name="message"
+ type="anyType" />
+ </sequence>
+ </complexType>
+ </element>
+ <element name="failure">
+ <complexType>
+ <sequence>
+ <element name="explanation" type="string" />
+ </sequence>
+ </complexType>
+ </element>
+ </choice>
+ </sequence>
+ </complexType>
+ </element>
+ </sequence>
</complexType>
<complexType name="Replay">
<sequence>
- <element name="upgradeInstance" minOccurs="0" maxOccurs="unbounded" type="long"/>
- <element name="replaceInstance" minOccurs="0" maxOccurs="unbounded" type="long"/>
- <element name="restoreInstance" minOccurs="0" maxOccurs="unbounded" type="pmapi:CommunicationType"/>
+ <element name="upgradeInstance" minOccurs="0" maxOccurs="unbounded" type="long"/>
+ <element name="replaceInstance" minOccurs="0" maxOccurs="unbounded" type="long"/>
+ <element name="restoreInstance" minOccurs="0" maxOccurs="unbounded" type="pmapi:CommunicationType"/>
</sequence>
</complexType>