You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by mr...@apache.org on 2006/07/20 01:57:42 UTC
svn commit: r423668 [1/2] - in /incubator/ode/scratch/pxe-iapi:
axis2/src/main/java/com/fs/pxe/axis2/
axis2/src/main/java/com/fs/pxe/axis2/hooks/
bpel-api/src/main/java/com/fs/pxe/bpel/dao/
bpel-api/src/main/java/com/fs/pxe/bpel/epr/ bpel-api/src/main/...
Author: mriou
Date: Wed Jul 19 16:57:40 2006
New Revision: 423668
URL: http://svn.apache.org/viewvc?rev=423668&view=rev
Log:
Implemented the usage of process sessions (with implicit correlation) using Axis2. Several fixes in message exchanges state management.
Added:
incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/hooks/SessionInHandler.java
incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/hooks/SessionOutHandler.java
Modified:
incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/AxisInvoker.java
incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/DeploymentUnit.java
incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/ExternalService.java
incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/MessageExchangeContextImpl.java
incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/Messages.java
incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/PXEServer.java
incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/PXEService.java
incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/SOAPUtils.java
incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/hooks/PXEAxisDispatcher.java
incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/hooks/PXEMessageReceiver.java
incubator/ode/scratch/pxe-iapi/bpel-api/src/main/java/com/fs/pxe/bpel/dao/MessageExchangeDAO.java
incubator/ode/scratch/pxe-iapi/bpel-api/src/main/java/com/fs/pxe/bpel/epr/WSAEndpoint.java
incubator/ode/scratch/pxe-iapi/bpel-api/src/main/java/com/fs/pxe/bpel/epr/WSDL20Endpoint.java
incubator/ode/scratch/pxe-iapi/bpel-api/src/main/java/com/fs/pxe/bpel/iapi/Message.java
incubator/ode/scratch/pxe-iapi/bpel-api/src/main/java/com/fs/pxe/bpel/iapi/MessageExchange.java
incubator/ode/scratch/pxe-iapi/bpel-api/src/main/java/com/fs/pxe/bpel/iapi/MyRoleMessageExchange.java
incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/engine/BpelProcess.java
incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/engine/BpelRuntimeContextImpl.java
incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/engine/BpelServerImpl.java
incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/engine/MessageExchangeImpl.java
incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/engine/MessageImpl.java
incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/engine/MyRoleMessageExchangeImpl.java
incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/engine/PartnerRoleMessageExchangeImpl.java
incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/engine/ProcessDDInitializer.java
incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/runtime/ASSIGN.java
incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/runtime/BpelRuntimeContext.java
incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/runtime/EH_EVENT.java
incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/runtime/INVOKE.java
incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/runtime/PICK.java
incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/runtime/PropertyAliasEvaluationContext.java
incubator/ode/scratch/pxe-iapi/dao-hibernate/src/main/java/com/fs/pxe/daohib/bpel/MessageExchangeDaoImpl.java
incubator/ode/scratch/pxe-iapi/dao-hibernate/src/main/java/com/fs/pxe/daohib/bpel/hobj/HMessageExchange.java
incubator/ode/scratch/pxe-iapi/utils/src/main/java/com/fs/utils/Namespaces.java
Modified: incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/AxisInvoker.java
URL: http://svn.apache.org/viewvc/incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/AxisInvoker.java?rev=423668&r1=423667&r2=423668&view=diff
==============================================================================
--- incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/AxisInvoker.java (original)
+++ incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/AxisInvoker.java Wed Jul 19 16:57:40 2006
@@ -40,7 +40,7 @@
com.fs.pxe.bpel.iapi.MessageExchange.MessageExchangePattern.REQUEST_RESPONSE;
try {
Document doc = DOMUtils.newDocument();
- // TODO handle message style
+
Element op = doc.createElement(pxeMex.getOperationName());
op.appendChild(doc.importNode(DOMUtils.getFirstChildElement(pxeMex.getRequest().getMessage()), true));
Modified: incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/DeploymentUnit.java
URL: http://svn.apache.org/viewvc/incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/DeploymentUnit.java?rev=423668&r1=423667&r2=423668&view=diff
==============================================================================
--- incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/DeploymentUnit.java (original)
+++ incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/DeploymentUnit.java Wed Jul 19 16:57:40 2006
@@ -104,9 +104,8 @@
}
} catch (AxisFault axisFault) {
throw new DeploymentException("Service deployment in Axis2 failed!", axisFault);
- } catch (IOException e) {
- // Highly unexpected
- e.printStackTrace();
+ } catch (Throwable e) {
+
}
}
_lastModified = new File(_duDirectory, "deploy.xml").lastModified();
Modified: incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/ExternalService.java
URL: http://svn.apache.org/viewvc/incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/ExternalService.java?rev=423668&r1=423667&r2=423668&view=diff
==============================================================================
--- incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/ExternalService.java (original)
+++ incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/ExternalService.java Wed Jul 19 16:57:40 2006
@@ -1,26 +1,26 @@
package com.fs.pxe.axis2;
-import com.fs.pxe.bpel.iapi.PartnerRoleMessageExchange;
-import com.fs.pxe.bpel.iapi.MessageExchange;
-import com.fs.pxe.bpel.iapi.Message;
import com.fs.pxe.bpel.epr.MutableEndpoint;
+import com.fs.pxe.bpel.iapi.Message;
+import com.fs.pxe.bpel.iapi.MessageExchange;
+import com.fs.pxe.bpel.iapi.PartnerRoleMessageExchange;
import com.fs.utils.DOMUtils;
-
-import javax.wsdl.Definition;
-import javax.xml.namespace.QName;
-
-import org.w3c.dom.Element;
import org.apache.axiom.om.OMElement;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.client.Options;
import org.apache.axis2.client.ServiceClient;
-import org.apache.axis2.addressing.EndpointReference;
-import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.engine.AxisConfiguration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.w3c.dom.Element;
-import java.util.concurrent.Future;
+import javax.wsdl.Definition;
+import javax.xml.namespace.QName;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
/**
* Acts as a service not provided by PXE. Used mainly for invocation as a way to
@@ -35,13 +35,15 @@
private Definition _definition;
private QName _serviceName;
private String _portName;
+ private AxisConfiguration _axisConfig;
public ExternalService(Definition definition, QName serviceName,
- String portName, ExecutorService executorService) {
+ String portName, ExecutorService executorService, AxisConfiguration axisConfig) {
_definition = definition;
_serviceName = serviceName;
_portName = portName;
_executorService = executorService;
+ _axisConfig = axisConfig;
}
public void invoke(final PartnerRoleMessageExchange pxeMex) {
@@ -59,8 +61,14 @@
__log.debug("Message: " + payload);
options.setTo(axisEPR);
- final ServiceClient serviceClient = new ServiceClient();
+ ConfigurationContext ctx = new ConfigurationContext(_axisConfig);
+ final ServiceClient serviceClient = new ServiceClient(ctx, null);
serviceClient.setOptions(options);
+ // Override options are passed to the axis MessageContext so we can
+ // retrieve them in our session out handler.
+ Options mexOptions = new Options();
+ writeHeader(mexOptions, pxeMex);
+ serviceClient.setOverrideOptions(mexOptions);
if (isTwoWay) {
// Invoking in a separate thread even though we're supposed to wait for a synchronous reply
@@ -87,7 +95,7 @@
response.setMessage(OMUtils.toDOM(reply));
pxeMex.reply(response);
} else
- serviceClient.sendRobust(payload);
+ serviceClient.fireAndForget(payload);
} catch (AxisFault axisFault) {
String errmsg = "Error sending message to Axis2 for PXE mex " + pxeMex;
__log.error(errmsg, axisFault);
@@ -95,4 +103,18 @@
}
}
+
+ /**
+ * Extracts endpoint information from PXE message exchange to stuff them into
+ * Axis MessageContext.
+ */
+ private void writeHeader(Options options, PartnerRoleMessageExchange pxeMex) {
+ if (pxeMex.getEndpointReference() != null) {
+ options.setProperty("targetSessionEndpoint", pxeMex.getEndpointReference());
+ }
+ if (pxeMex.getCallbackEndpointReference() != null) {
+ options.setProperty("callbackSessionEndpoint", pxeMex.getCallbackEndpointReference());
+ }
+ }
+
}
Modified: incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/MessageExchangeContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/MessageExchangeContextImpl.java?rev=423668&r1=423667&r2=423668&view=diff
==============================================================================
--- incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/MessageExchangeContextImpl.java (original)
+++ incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/MessageExchangeContextImpl.java Wed Jul 19 16:57:40 2006
@@ -8,6 +8,7 @@
import com.fs.pxe.bpel.iapi.EndpointReference;
import com.fs.pxe.bpel.epr.WSDL11Endpoint;
import com.fs.pxe.bpel.epr.EndpointFactory;
+import com.fs.pxe.bpel.epr.WSAEndpoint;
import com.fs.utils.Namespaces;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -36,11 +37,15 @@
__log.debug("Invoking a partner operation: " + partnerRoleMessageExchange.getOperationName());
EndpointReference epr = partnerRoleMessageExchange.getEndpointReference();
- // We only invoke with WSDL 1.1 service elements, that makes our life easier
- if (!(epr instanceof WSDL11Endpoint))
- epr = EndpointFactory.convert(new QName(Namespaces.WSDL_11, "service"), epr.toXML().getDocumentElement());
+ // We only invoke with WSA endpoints, that makes our life easier
+ if (!(epr instanceof WSAEndpoint))
+ epr = EndpointFactory.convert(new QName(Namespaces.WS_ADDRESSING_NS, "EndpointReference"),
+ epr.toXML().getDocumentElement());
// It's now safe to cast
- ExternalService service = _server.getExternalService(((WSDL11Endpoint)epr).getServiceName());
+ QName serviceName = ((WSAEndpoint)epr).getServiceName();
+ if (__log.isDebugEnabled())
+ __log.debug("The service to invoke is the external service " + serviceName);
+ ExternalService service = _server.getExternalService(serviceName);
service.invoke(partnerRoleMessageExchange);
}
Modified: incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/Messages.java
URL: http://svn.apache.org/viewvc/incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/Messages.java?rev=423668&r1=423667&r2=423668&view=diff
==============================================================================
--- incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/Messages.java (original)
+++ incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/Messages.java Wed Jul 19 16:57:40 2006
@@ -21,7 +21,7 @@
}
public String msgPxeInitHibernatePropertiesNotFound(File expected) {
- return format("Hibernate configuration file \"{0}\" not found!", expected);
+ return format("Hibernate configuration file \"{0}\" not foun, defaults will be used.!", expected);
}
public String msgPxeUsingExternalDb(String dbDataSource) {
Modified: incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/PXEServer.java
URL: http://svn.apache.org/viewvc/incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/PXEServer.java?rev=423668&r1=423667&r2=423668&view=diff
==============================================================================
--- incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/PXEServer.java (original)
+++ incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/PXEServer.java Wed Jul 19 16:57:40 2006
@@ -171,6 +171,7 @@
// We're public!
_axisConfig.addService(axisService);
+ __log.debug("Created Axis2 service " + serviceName);
} catch (AxisFault axisFault) {
// TODO do something!
axisFault.printStackTrace();
@@ -180,8 +181,9 @@
public void createExternalService(Definition4BPEL def, QName serviceName, String portName) {
if (_externalServices.get(serviceName) != null) return;
- ExternalService extService = new ExternalService(def, serviceName, portName, _executorService);
+ ExternalService extService = new ExternalService(def, serviceName, portName, _executorService, _axisConfig);
_externalServices.put(serviceName, extService);
+ __log.debug("Created external service " + serviceName);
}
public void destroyService(QName serviceName) {
Modified: incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/PXEService.java
URL: http://svn.apache.org/viewvc/incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/PXEService.java?rev=423668&r1=423667&r2=423668&view=diff
==============================================================================
--- incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/PXEService.java (original)
+++ incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/PXEService.java Wed Jul 19 16:57:40 2006
@@ -4,27 +4,26 @@
import com.fs.pxe.bpel.iapi.Message;
import com.fs.pxe.bpel.iapi.MyRoleMessageExchange;
import com.fs.pxe.bpel.iapi.MessageExchange;
+import com.fs.pxe.bpel.epr.WSAEndpoint;
import com.fs.pxe.bom.wsdl.Definition4BPEL;
import com.fs.utils.DOMUtils;
-import org.apache.axiom.om.OMElement;
+import com.fs.utils.GUID;
import org.apache.axiom.soap.SOAPEnvelope;
import org.apache.axiom.soap.SOAPFactory;
+import org.apache.axiom.soap.SOAPHeader;
import org.apache.axis2.AxisFault;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.description.AxisService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
import javax.transaction.TransactionManager;
-import javax.wsdl.Part;
import javax.wsdl.Definition;
import javax.xml.namespace.QName;
import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
/**
@@ -55,60 +54,43 @@
public void onAxisMessageExchange(MessageContext msgContext, MessageContext outMsgContext,
SOAPFactory soapFactory) throws AxisFault {
- boolean success = false;
+ boolean success = true;
MyRoleMessageExchange pxeMex = null;
+ ResponseCallback callback = null;
try {
_txManager.begin();
- pxeMex = _server.getEngine().createMessageExchange(
- msgContext.getMessageID(),
+ // Creating mesage exchange
+ String messageId = new GUID().toString();
+ pxeMex = _server.getEngine().createMessageExchange(""+messageId,
new QName(msgContext.getAxisService().getTargetNamespace(),
- msgContext.getAxisService().getName()), null,
+ msgContext.getAxisService().getName()), null,
msgContext.getAxisOperation().getName().getLocalPart());
+
if (pxeMex.getOperation() != null) {
+ // Preparing message to send to PXE
Message pxeRequest = pxeMex.createMessage(pxeMex.getOperation().getInput().getMessage().getQName());
Element msgContent = SOAPUtils.unwrap(OMUtils.toDOM(msgContext.getEnvelope().getBody().getFirstElement()),
_wsdlDef, pxeMex.getOperation().getInput().getMessage(), _serviceName);
+ readHeader(msgContext, pxeMex);
pxeRequest.setMessage(msgContent);
// Preparing a callback just in case we would need one.
- ResponseCallback callback = null;
if (pxeMex.getOperation().getOutput() != null) {
callback = new ResponseCallback();
- _waitingCallbacks.put(pxeMex.getMessageExchangeId(), callback);
+ _waitingCallbacks.put(pxeMex.getClientId(), callback);
}
if (__log.isDebugEnabled()) {
__log.debug("Invoking PXE using MEX " + pxeMex);
__log.debug("Message content: " + DOMUtils.domToString(pxeRequest.getMessage()));
}
+ // Invoking PXE
pxeMex.invoke(pxeRequest);
-
- boolean timeout = false;
- // Invocation response could be delayed, if so we have to wait for it.
- if (pxeMex.getStatus() == MessageExchange.Status.ASYNC) {
- MyRoleMessageExchange tmpMex = callback.getResponse(TIMEOUT);
- if (tmpMex == null) timeout = true;
- } else {
- // Callback wasn't necessary, cleaning up
- _waitingCallbacks.remove(pxeMex.getMessageExchangeId());
- }
-
- SOAPEnvelope envelope = soapFactory.getDefaultEnvelope();
- outMsgContext.setEnvelope(envelope);
-
- // Hopefully we have a response
- __log.debug("Handling response for MEX " + pxeMex);
- if (timeout) {
- __log.error("Timeout when waiting for response to MEX " + pxeMex);
- } else {
- onResponse(pxeMex, envelope);
- success = true;
- }
- } else {
- __log.error("PXE MEX " + pxeMex + " was unroutable.");
}
} catch(Exception e) {
+ e.printStackTrace();
+ success = false;
throw new AxisFault("An exception occured when invoking PXE.", e);
} finally {
if (success) {
@@ -126,28 +108,72 @@
throw new AxisFault("Rollback failed!", e);
}
}
+
+ if (pxeMex.getOperation() != null) {
+ boolean timeout = false;
+ // Invocation response could be delayed, if so we have to wait for it.
+ if (pxeMex.getStatus() == MessageExchange.Status.ASYNC) {
+ pxeMex = callback.getResponse(TIMEOUT);
+ if (pxeMex == null) timeout = true;
+ } else {
+ // Callback wasn't necessary, cleaning up
+ _waitingCallbacks.remove(pxeMex.getMessageExchangeId());
+ }
+
+ if (outMsgContext != null) {
+ SOAPEnvelope envelope = soapFactory.getDefaultEnvelope();
+ outMsgContext.setEnvelope(envelope);
+
+ // Hopefully we have a response
+ __log.debug("Handling response for MEX " + pxeMex);
+ if (timeout) {
+ __log.error("Timeout when waiting for response to MEX " + pxeMex);
+ success = false;
+ } else {
+ try {
+ _txManager.begin();
+ onResponse(pxeMex, outMsgContext);
+ } catch (Exception e) {
+ try {
+ _txManager.rollback();
+ } catch (Exception ex) {
+ throw new AxisFault("Rollback failed!", ex);
+ }
+ throw new AxisFault("An exception occured when invoking PXE.", e);
+ } finally {
+ try {
+ _txManager.commit();
+ } catch (Exception e) {
+ throw new AxisFault("Commit failed!", e);
+ }
+ }
+ }
+ }
+ }
}
- if (!success) throw new AxisFault("Message was unroutable!");
+ if (!success) throw new AxisFault("Message was either unroutable or timed out!");
}
public void notifyResponse(MyRoleMessageExchange mex) {
- ResponseCallback callback = _waitingCallbacks.get(mex.getMessageExchangeId());
+ ResponseCallback callback = _waitingCallbacks.get(mex.getClientId());
if (callback == null) {
__log.error("No active service for message exchange: " + mex);
} else {
callback.onResponse(mex);
- _waitingCallbacks.remove(mex.getMessageExchangeId());
+ _waitingCallbacks.remove(mex.getClientId());
}
}
- private void onResponse(MyRoleMessageExchange mex, SOAPEnvelope envelope) throws AxisFault {
+ private void onResponse(MyRoleMessageExchange mex, MessageContext msgContext) throws AxisFault {
switch (mex.getStatus()) {
case FAULT:
throw new AxisFault(null, mex.getFault(), null, null, OMUtils.toOM(mex.getFaultResponse().getMessage()));
+ case ASYNC:
case RESPONSE:
Element response = SOAPUtils.wrap(mex.getResponse().getMessage(), _wsdlDef, _serviceName,
mex.getOperation(), mex.getOperation().getOutput().getMessage());
- envelope.getBody().addChild(OMUtils.toOM(response));
+ msgContext.getEnvelope().getBody().addChild(OMUtils.toOM(response));
+ writeHeader(msgContext, mex);
break;
case FAILURE:
// TODO: get failure codes out of the message.
@@ -157,44 +183,37 @@
}
}
- // TODO Handle messages style
- private void convertMessage(javax.wsdl.Message msgdef, Message dest, OMElement body) throws AxisFault {
- Element srcel = OMUtils.toDOM(body);
-
- Document pxemsgdoc = DOMUtils.newDocument();
- Element pxemsg = pxemsgdoc.createElement("message");
- pxemsgdoc.appendChild(pxemsg);
-
- List<Part> expectedParts = msgdef.getOrderedParts(null);
-
- Element srcpart = DOMUtils.getFirstChildElement(srcel);
- for (Part pdef : expectedParts) {
- Element p = pxemsgdoc.createElement(pdef.getName());
- pxemsg.appendChild(p);
- if (srcpart != null) {
- NodeList nl = srcpart.getChildNodes();
- for (int j = 0; j < nl.getLength(); ++j)
- p.appendChild(pxemsgdoc.importNode(nl.item(j), true));
- srcpart = DOMUtils.getNextSiblingElement(srcpart);
- } else {
- __log.error("Improperly formatted message, missing part: " + pdef.getName());
- }
+ /**
+ * Extracts endpoint information from Axis MessageContext (taken from WSA headers) to
+ * stuff them into PXE mesage exchange.
+ */
+ private void readHeader(MessageContext msgContext, MyRoleMessageExchange pxeMex) {
+ Object otse = msgContext.getProperty("targetSessionEndpoint");
+ Object ocse = msgContext.getProperty("callbackSessionEndpoint");
+ if (otse != null) {
+ Element serviceEpr = (Element) otse;
+ WSAEndpoint endpoint = new WSAEndpoint();
+ endpoint.set(serviceEpr);
+ pxeMex.setEndpointReference(endpoint);
+ }
+ if (ocse != null) {
+ Element serviceEpr = (Element) ocse;
+ WSAEndpoint endpoint = new WSAEndpoint();
+ endpoint.set(serviceEpr);
+ pxeMex.setCallbackEndpointReference(endpoint);
}
-
- dest.setMessage(pxemsg);
}
- private void fillEnvelope(MyRoleMessageExchange mex, SOAPEnvelope envelope) throws AxisFault {
- Message resp = mex.getResponse();
- // TODO Handle messages style
- OMElement responseRoot =
- envelope.getOMFactory().createOMElement(mex.getOperation().getName() + "Response", null);
- envelope.getBody().addChild(responseRoot);
-
- Element srcPartEl = DOMUtils.getFirstChildElement(resp.getMessage());
- while (srcPartEl != null) {
- responseRoot.addChild(OMUtils.toOM(srcPartEl));
- srcPartEl = DOMUtils.getNextSiblingElement(srcPartEl);
+ /**
+ * Extracts endpoint information from PXE message exchange to stuff them into
+ * Axis MessageContext.
+ */
+ private void writeHeader(MessageContext msgContext, MyRoleMessageExchange pxeMex) {
+ if (pxeMex.getEndpointReference() != null) {
+ msgContext.setProperty("targetSessionEndpoint", pxeMex.getEndpointReference());
+ }
+ if (pxeMex.getCallbackEndpointReference() != null) {
+ msgContext.setProperty("callbackSessionEndpoint", pxeMex.getCallbackEndpointReference());
}
}
Modified: incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/SOAPUtils.java
URL: http://svn.apache.org/viewvc/incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/SOAPUtils.java?rev=423668&r1=423667&r2=423668&view=diff
==============================================================================
--- incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/SOAPUtils.java (original)
+++ incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/SOAPUtils.java Wed Jul 19 16:57:40 2006
@@ -46,11 +46,9 @@
srcPartEl = DOMUtils.getNextSiblingElement(srcPartEl);
}
} else {
- // Extracting element name: parts can't be a type as per WS-BP
- QName elmtName = ((Part)msgDef.getParts().get(0)).getElementName();
- Element responseRoot = doc.createElementNS(elmtName.getNamespaceURI(), elmtName.getLocalPart());
- // Message style has only one part, directly included in the body.
- doc.appendChild(responseRoot);
+ // Removing message element and part, message services have only one part which is
+ // the root element of the message.
+ doc.appendChild(doc.importNode(DOMUtils.getFirstChildElement(DOMUtils.getFirstChildElement(message)), true));
}
return doc.getDocumentElement();
}
@@ -72,9 +70,9 @@
Element msgElmt = doc.createElement("message");
doc.appendChild(msgElmt);
// Just making sure the part has no namespace
- Element destPart = doc.createElement(bodyElmt.getLocalName());
+ Element destPart = doc.createElement((String) msgDef.getParts().keySet().iterator().next());
msgElmt.appendChild(destPart);
- destPart.appendChild(doc.importNode(DOMUtils.getFirstChildElement(bodyElmt), true));
+ destPart.appendChild(doc.importNode(bodyElmt, true));
return msgElmt;
}
}
Modified: incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/hooks/PXEAxisDispatcher.java
URL: http://svn.apache.org/viewvc/incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/hooks/PXEAxisDispatcher.java?rev=423668&r1=423667&r2=423668&view=diff
==============================================================================
--- incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/hooks/PXEAxisDispatcher.java (original)
+++ incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/hooks/PXEAxisDispatcher.java Wed Jul 19 16:57:40 2006
@@ -54,8 +54,10 @@
log.debug("Checking for Operation using SOAP message body's first child's local name : "
+ localName);
operation = service.getOperation(new QName(localName));
- if (operation != null)
+ if (operation != null) {
+ log.debug("Found operation " + operation);
return operation;
+ }
// Of course, the element name most likely uses the suffix
// Request or Response, so look for those and strip them.
@@ -68,7 +70,7 @@
return service.getOperation(new QName(localName.substring(0, index)));
}
}
-
+ log.warn("No operation has been found!");
return null;
}
@@ -88,10 +90,12 @@
if (path != null) {
AxisConfiguration registry =
messageContext.getConfigurationContext().getAxisConfiguration();
- return registry.getService(path);
+ AxisService service = registry.getService(path);
+ log.debug("Found service in registry from name " + path + ": " + service);
+ return service;
}
}
-
+ log.warn("No service has been found!");
return null;
}
Modified: incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/hooks/PXEMessageReceiver.java
URL: http://svn.apache.org/viewvc/incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/hooks/PXEMessageReceiver.java?rev=423668&r1=423667&r2=423668&view=diff
==============================================================================
--- incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/hooks/PXEMessageReceiver.java (original)
+++ incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/hooks/PXEMessageReceiver.java Wed Jul 19 16:57:40 2006
@@ -41,15 +41,15 @@
engine.send(outMsgContext);
} else {
// No response expected, this thread doesn't need us
- _executorService.submit(new Runnable() {
- public void run() {
- try {
+// _executorService.submit(new Runnable() {
+// public void run() {
+// try {
invokeBusinessLogic(msgContext, null);
- } catch (AxisFault axisFault) {
- __log.error("Error process in-only message.", axisFault);
- }
- }
- });
+// } catch (AxisFault axisFault) {
+// __log.error("Error processing in-only message.", axisFault);
+// }
+// }
+// });
}
}
Added: incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/hooks/SessionInHandler.java
URL: http://svn.apache.org/viewvc/incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/hooks/SessionInHandler.java?rev=423668&view=auto
==============================================================================
--- incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/hooks/SessionInHandler.java (added)
+++ incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/hooks/SessionInHandler.java Wed Jul 19 16:57:40 2006
@@ -0,0 +1,86 @@
+package com.fs.pxe.axis2.hooks;
+
+import org.apache.axis2.handlers.AbstractHandler;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.AxisFault;
+import org.apache.axiom.soap.SOAPHeader;
+import org.apache.axiom.om.OMElement;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.w3c.dom.Element;
+import org.w3c.dom.Document;
+import com.fs.utils.Namespaces;
+import com.fs.utils.DOMUtils;
+
+import javax.xml.namespace.QName;
+
+/**
+ * An incoming handler adding session id information in the message
+ * context.
+ */
+public class SessionInHandler extends AbstractHandler {
+
+ private static final Log __log = LogFactory.getLog(SessionInHandler.class);
+
+ public void invoke(MessageContext messageContext) throws AxisFault {
+ SOAPHeader header = messageContext.getEnvelope().getHeader();
+ if (header != null) {
+ if (__log.isDebugEnabled())
+ __log.debug("Found a header in incoming message, checking if there are endpoints there.");
+ // Checking if a session identifier has been provided for a stateful endpoint
+ OMElement wsaToSession = header.getFirstChildWithName(new QName(Namespaces.INTALIO_SESSION_NS, "session"));
+ if (wsaToSession != null) {
+ // Building an endpoint supposed to target the right instance
+ Document doc = DOMUtils.newDocument();
+ Element serviceEpr = doc.createElementNS(Namespaces.WS_ADDRESSING_NS, "EndpointReference");
+ Element sessionId = doc.createElementNS(Namespaces.INTALIO_SESSION_NS, "session");
+ doc.appendChild(serviceEpr);
+ serviceEpr.appendChild(sessionId);
+ sessionId.setTextContent(wsaToSession.getText());
+ if (__log.isDebugEnabled())
+ __log.debug("A TO endpoint has been found in the header with session: " + wsaToSession.getText());
+
+ // Did the client provide an address too?
+ OMElement wsaToAddress = header.getFirstChildWithName(new QName(Namespaces.WS_ADDRESSING_NS, "To"));
+ if (wsaToAddress != null) {
+ Element addressElmt = doc.createElementNS(Namespaces.WS_ADDRESSING_NS, "Address");
+ addressElmt.setTextContent(wsaToAddress.getText());
+ serviceEpr.appendChild(addressElmt);
+ }
+ if (__log.isDebugEnabled())
+ __log.debug("Constructed a TO endpoint: " + DOMUtils.domToString(serviceEpr));
+ messageContext.setProperty("targetSessionEndpoint", serviceEpr);
+ }
+
+ // Seeing if there's a callback, in case our client would be stateful as well
+ OMElement callback = header.getFirstChildWithName(new QName(Namespaces.INTALIO_SESSION_NS, "callback"));
+ if (callback != null) {
+ OMElement callbackSession = callback.getFirstChildWithName(new QName(Namespaces.INTALIO_SESSION_NS, "session"));
+ if (callbackSession != null) {
+ // Building an endpoint that represents our client (we're supposed to call him later on)
+ Document doc = DOMUtils.newDocument();
+ Element serviceEpr = doc.createElementNS(Namespaces.WS_ADDRESSING_NS, "EndpointReference");
+ Element sessionId = doc.createElementNS(Namespaces.INTALIO_SESSION_NS, "session");
+ doc.appendChild(serviceEpr);
+ serviceEpr.appendChild(sessionId);
+ sessionId.setTextContent(callbackSession.getText());
+ if (__log.isDebugEnabled())
+ __log.debug("A CALLBACK endpoint has been found in the header with session: " + callbackSession.getText());
+
+ // Did the client give his address as well?
+ OMElement wsaToAddress = callback.getFirstChildWithName(new QName(Namespaces.WS_ADDRESSING_NS, "Address"));
+ if (wsaToAddress != null) {
+ Element addressElmt = doc.createElementNS(Namespaces.WS_ADDRESSING_NS, "Address");
+ addressElmt.setTextContent(wsaToAddress.getText());
+ serviceEpr.appendChild(addressElmt);
+ }
+ if (__log.isDebugEnabled())
+ __log.debug("Constructed a CALLBACK endpoint: " + DOMUtils.domToString(serviceEpr));
+ messageContext.setProperty("callbackSessionEndpoint", serviceEpr);
+ }
+ }
+
+ }
+ }
+
+}
Added: incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/hooks/SessionOutHandler.java
URL: http://svn.apache.org/viewvc/incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/hooks/SessionOutHandler.java?rev=423668&view=auto
==============================================================================
--- incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/hooks/SessionOutHandler.java (added)
+++ incubator/ode/scratch/pxe-iapi/axis2/src/main/java/com/fs/pxe/axis2/hooks/SessionOutHandler.java Wed Jul 19 16:57:40 2006
@@ -0,0 +1,73 @@
+package com.fs.pxe.axis2.hooks;
+
+import org.apache.axis2.handlers.AbstractHandler;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.AxisFault;
+import org.apache.axiom.soap.SOAPHeader;
+import org.apache.axiom.soap.SOAPFactory;
+import org.apache.axiom.soap.SOAPHeaderBlock;
+import org.apache.axiom.om.OMNamespace;
+import org.apache.axiom.om.OMElement;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import com.fs.utils.Namespaces;
+import com.fs.pxe.bpel.iapi.EndpointReference;
+import com.fs.pxe.bpel.epr.WSAEndpoint;
+
+/**
+ * An outgoing handler adding session id information in the message
+ * context.
+ */
+public class SessionOutHandler extends AbstractHandler {
+
+ private static final Log __log = LogFactory.getLog(SessionOutHandler.class);
+
+ public void invoke(MessageContext messageContext) throws AxisFault {
+ Object otargetSession = messageContext.getProperty("targetSessionEndpoint");
+ Object ocallbackSession = messageContext.getProperty("callbackSessionEndpoint");
+ if (otargetSession == null)
+ otargetSession = messageContext.getOptions().getProperty("targetSessionEndpoint");
+ if (ocallbackSession == null)
+ ocallbackSession = messageContext.getOptions().getProperty("callbackSessionEndpoint");
+
+ if (otargetSession != null || ocallbackSession != null) {
+ SOAPHeader header = messageContext.getEnvelope().getHeader();
+ SOAPFactory factory = (SOAPFactory) messageContext.getEnvelope().getOMFactory();
+ OMNamespace intalioSessNS = factory.createOMNamespace(Namespaces.INTALIO_SESSION_NS, "intalio");
+ OMNamespace wsAddrNS = factory.createOMNamespace(Namespaces.WS_ADDRESSING_NS, "addr");
+ if (header == null) {
+ header = factory.createSOAPHeader(messageContext.getEnvelope());
+ }
+ if (otargetSession != null) {
+ WSAEndpoint targetEpr = (WSAEndpoint) otargetSession;
+ OMElement to = factory.createOMElement("To", wsAddrNS);
+ header.addChild(to);
+ to.setText(targetEpr.getUrl());
+ if (targetEpr.getSessionId() != null) {
+ OMElement session = factory.createOMElement("session", intalioSessNS);
+ header.addChild(session);
+ session.setText(targetEpr.getSessionId());
+ }
+ __log.debug("Sending stateful TO epr in message header using session " + targetEpr.getSessionId());
+ }
+ if (ocallbackSession != null) {
+ WSAEndpoint callbackEpr = (WSAEndpoint) ocallbackSession;
+ OMElement callback = factory.createOMElement("callback", intalioSessNS);
+ header.addChild(callback);
+ OMElement address = factory.createOMElement("Address", wsAddrNS);
+ callback.addChild(address);
+ address.setText(callbackEpr.getUrl());
+ if (callbackEpr.getSessionId() != null) {
+ OMElement session = factory.createOMElement("session", intalioSessNS);
+ session.setText(callbackEpr.getSessionId());
+ callback.addChild(session);
+ }
+ __log.debug("Sending stateful FROM epr in message header using session " + callbackEpr.getSessionId());
+ }
+
+ __log.debug("Sending a message containing wsa endpoints in headers for session passing.");
+ __log.debug(messageContext.getEnvelope().toString());
+
+ }
+ }
+}
Modified: incubator/ode/scratch/pxe-iapi/bpel-api/src/main/java/com/fs/pxe/bpel/dao/MessageExchangeDAO.java
URL: http://svn.apache.org/viewvc/incubator/ode/scratch/pxe-iapi/bpel-api/src/main/java/com/fs/pxe/bpel/dao/MessageExchangeDAO.java?rev=423668&r1=423667&r2=423668&view=diff
==============================================================================
--- incubator/ode/scratch/pxe-iapi/bpel-api/src/main/java/com/fs/pxe/bpel/dao/MessageExchangeDAO.java (original)
+++ incubator/ode/scratch/pxe-iapi/bpel-api/src/main/java/com/fs/pxe/bpel/dao/MessageExchangeDAO.java Wed Jul 19 16:57:40 2006
@@ -143,6 +143,10 @@
Element getEPR();
+ void setCallbackEPR(Element epr);
+
+ Element getCallbackEPR();
+
String getPattern();
/**
Modified: incubator/ode/scratch/pxe-iapi/bpel-api/src/main/java/com/fs/pxe/bpel/epr/WSAEndpoint.java
URL: http://svn.apache.org/viewvc/incubator/ode/scratch/pxe-iapi/bpel-api/src/main/java/com/fs/pxe/bpel/epr/WSAEndpoint.java?rev=423668&r1=423667&r2=423668&view=diff
==============================================================================
--- incubator/ode/scratch/pxe-iapi/bpel-api/src/main/java/com/fs/pxe/bpel/epr/WSAEndpoint.java (original)
+++ incubator/ode/scratch/pxe-iapi/bpel-api/src/main/java/com/fs/pxe/bpel/epr/WSAEndpoint.java Wed Jul 19 16:57:40 2006
@@ -6,7 +6,10 @@
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import javax.xml.namespace.QName;
import java.util.HashMap;
import java.util.Map;
@@ -15,19 +18,21 @@
*/
public class WSAEndpoint implements MutableEndpoint {
+ private static final Log __log = LogFactory.getLog(WSAEndpoint.class);
+
private Element _eprElmt;
public String getSessionId() {
- NodeList idNodes = _eprElmt.getElementsByTagNameNS(Namespaces.INTALIO_SESSION_NS, "identifier");
+ NodeList idNodes = _eprElmt.getElementsByTagNameNS(Namespaces.INTALIO_SESSION_NS, "session");
if (idNodes.getLength() > 0) return idNodes.item(0).getTextContent();
else return null;
}
public void setSessionId(String sessionId) {
- NodeList idList = _eprElmt.getElementsByTagNameNS(Namespaces.INTALIO_SESSION_NS, "identifier");
+ NodeList idList = _eprElmt.getElementsByTagNameNS(Namespaces.INTALIO_SESSION_NS, "session");
if (idList.getLength() > 0) idList.item(0).setTextContent(sessionId);
else {
- Element sessElmt = _eprElmt.getOwnerDocument().createElementNS(Namespaces.INTALIO_SESSION_NS, "identifier");
+ Element sessElmt = _eprElmt.getOwnerDocument().createElementNS(Namespaces.INTALIO_SESSION_NS, "session");
sessElmt.setTextContent(sessionId);
_eprElmt.appendChild(sessElmt);
}
@@ -47,10 +52,33 @@
}
}
+ public QName getServiceName() {
+ NodeList metadataList = _eprElmt.getElementsByTagNameNS(Namespaces.WS_ADDRESSING_NS, "Metadata");
+ if (metadataList.getLength() > 0) {
+ Element metadata = (Element) metadataList.item(0);
+ Element service = DOMUtils.getFirstChildElement(metadata);
+ String serviceTextQName = service.getTextContent();
+ int twoDotsIdx = serviceTextQName.indexOf(":");
+ String prefix = serviceTextQName.substring(0, twoDotsIdx);
+ String serviceNS = _eprElmt.getOwnerDocument().lookupNamespaceURI(prefix);
+ // Lookup failed, checking directly on our element
+ if (serviceNS == null) {
+ serviceNS = service.getAttribute("xmlns:" + prefix);
+ }
+ if (serviceNS == null) __log.warn("Couldn't find an appropriate namespace for service!");
+ QName result = new QName(serviceNS, serviceTextQName.substring(twoDotsIdx + 1, serviceTextQName.length()));
+ if (__log.isDebugEnabled())
+ __log.debug("Got service name from WSAEndpoint: " + result);
+ return result;
+ }
+ return null;
+ }
+
public boolean accept(Node node) {
if (node.getNodeType() == Node.ELEMENT_NODE) {
Element elmt = (Element)node;
- if (elmt.getLocalName().equals("service-ref") && elmt.getNamespaceURI().equals(Namespaces.WS_BPEL_20_NS))
+ if (elmt.getLocalName().equals(SERVICE_REF_QNAME.getLocalPart())
+ && elmt.getNamespaceURI().equals(SERVICE_REF_QNAME.getNamespaceURI()))
elmt= DOMUtils.getFirstChildElement(elmt);
if (elmt.getLocalName().equals("EndpointReference") && elmt.getNamespaceURI().equals(Namespaces.WS_ADDRESSING_NS))
return true;
@@ -63,22 +91,49 @@
_eprElmt = DOMUtils.getFirstChildElement((Element)node);
else
_eprElmt = (Element) node;
+ if (__log.isDebugEnabled())
+ __log.debug("Setting a WSAEndpoint value: " + DOMUtils.domToString(_eprElmt));
}
public Document toXML() {
// Wrapping
Document doc = DOMUtils.newDocument();
- Element serviceRef = doc.createElementNS(Namespaces.WS_BPEL_20_NS, "service-ref");
+ Element serviceRef = doc.createElementNS(SERVICE_REF_QNAME.getNamespaceURI(), SERVICE_REF_QNAME.getLocalPart());
doc.appendChild(serviceRef);
serviceRef.appendChild(doc.importNode(_eprElmt, true));
return _eprElmt.getOwnerDocument();
}
public Map toMap() {
- HashMap<String,String> result = new HashMap<String,String>();
+ HashMap<String,Object> result = new HashMap<String,Object>();
result.put(ADDRESS, getUrl());
String sid = getSessionId();
if (sid != null) result.put(SESSION, sid);
+ NodeList metadataList = _eprElmt.getElementsByTagNameNS(Namespaces.WS_ADDRESSING_NS, "Metadata");
+ if (metadataList.getLength() > 0) {
+ Element metadata = (Element) metadataList.item(0);
+ Element service = DOMUtils.getFirstChildElement(metadata);
+ String serviceTextQName = service.getTextContent();
+ int twoDotsIdx = serviceTextQName.indexOf(":");
+ String prefix = serviceTextQName.substring(0, twoDotsIdx);
+ String serviceNS = null;
+ try {
+ serviceNS = _eprElmt.getOwnerDocument().lookupNamespaceURI(prefix);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ // Lookup failed, checking directly on our element
+ if (serviceNS == null) {
+ serviceNS = service.getAttribute("xmlns:" + prefix);
+ }
+ result.put(SERVICE_QNAME, new QName(serviceNS,
+ serviceTextQName.substring(twoDotsIdx + 1, serviceTextQName.length())));
+ result.put(PORT_NAME, service.getAttribute("EndpointName"));
+ if (__log.isDebugEnabled()) {
+ __log.debug("Filled transfo map with service: " + result.get(SERVICE_QNAME));
+ __log.debug("Filled transfo map with port: " + result.get(PORT_NAME));
+ }
+ }
return result;
}
@@ -91,10 +146,22 @@
Element addrElmt = doc.createElementNS(Namespaces.WS_ADDRESSING_NS, "Address");
addrElmt.setTextContent((String) eprMap.get(ADDRESS));
if (eprMap.get(SESSION) != null) {
- Element sessElmt = doc.createElementNS(Namespaces.INTALIO_SESSION_NS, "identifier");
+ Element sessElmt = doc.createElementNS(Namespaces.INTALIO_SESSION_NS, "session");
sessElmt.setTextContent((String) eprMap.get(SESSION));
_eprElmt.appendChild(sessElmt);
}
+ if (eprMap.get(SERVICE_QNAME) != null) {
+ Element metadataElmt = doc.createElementNS(Namespaces.WS_ADDRESSING_NS, "Metadata");
+ _eprElmt.appendChild(metadataElmt);
+ Element serviceElmt = doc.createElementNS(Namespaces.WS_ADDRESSING_WSDL_NS, "ServiceName");
+ metadataElmt.appendChild(serviceElmt);
+ QName serviceQName = (QName) eprMap.get(SERVICE_QNAME);
+ serviceElmt.setAttribute("xmlns:servicens", serviceQName.getNamespaceURI());
+ serviceElmt.setTextContent("servicens:" + serviceQName.getLocalPart());
+ serviceElmt.setAttribute("EndpointName", (String) eprMap.get(PORT_NAME));
+ }
_eprElmt.appendChild(addrElmt);
+ if (__log.isDebugEnabled())
+ __log.debug("Constructed a new WSAEndpoint: " + DOMUtils.domToString(_eprElmt));
}
}
Modified: incubator/ode/scratch/pxe-iapi/bpel-api/src/main/java/com/fs/pxe/bpel/epr/WSDL20Endpoint.java
URL: http://svn.apache.org/viewvc/incubator/ode/scratch/pxe-iapi/bpel-api/src/main/java/com/fs/pxe/bpel/epr/WSDL20Endpoint.java?rev=423668&r1=423667&r2=423668&view=diff
==============================================================================
--- incubator/ode/scratch/pxe-iapi/bpel-api/src/main/java/com/fs/pxe/bpel/epr/WSDL20Endpoint.java (original)
+++ incubator/ode/scratch/pxe-iapi/bpel-api/src/main/java/com/fs/pxe/bpel/epr/WSDL20Endpoint.java Wed Jul 19 16:57:40 2006
@@ -22,17 +22,17 @@
public String getSessionId() {
Element endpointElmt = (Element)_serviceElmt.getElementsByTagNameNS(Namespaces.WSDL_20, "endpoint").item(0);
- NodeList idNodes = endpointElmt.getElementsByTagNameNS(Namespaces.INTALIO_SESSION_NS, "identifier");
+ NodeList idNodes = endpointElmt.getElementsByTagNameNS(Namespaces.INTALIO_SESSION_NS, "session");
if (idNodes.getLength() > 0) return idNodes.item(0).getTextContent();
else return null;
}
public void setSessionId(String sessionId) {
Element endpointElmt = (Element)_serviceElmt.getElementsByTagNameNS(Namespaces.WSDL_20, "endpoint").item(0);
- NodeList idList = endpointElmt.getElementsByTagNameNS(Namespaces.INTALIO_SESSION_NS, "identifier");
+ NodeList idList = endpointElmt.getElementsByTagNameNS(Namespaces.INTALIO_SESSION_NS, "session");
if (idList.getLength() > 0) idList.item(0).setTextContent(sessionId);
else {
- Element sessElmt = _serviceElmt.getOwnerDocument().createElementNS(Namespaces.INTALIO_SESSION_NS, "identifier");
+ Element sessElmt = _serviceElmt.getOwnerDocument().createElementNS(Namespaces.INTALIO_SESSION_NS, "session");
sessElmt.setTextContent(sessionId);
endpointElmt.appendChild(sessElmt);
}
@@ -101,7 +101,7 @@
endpoint.setAttribute("binding", "");
if (eprMap.get(ADDRESS) != null) endpoint.setAttribute("address", (String) eprMap.get(ADDRESS));
if (eprMap.get(SESSION) != null) {
- Element session = doc.createElementNS(Namespaces.INTALIO_SESSION_NS, "identifier");
+ Element session = doc.createElementNS(Namespaces.INTALIO_SESSION_NS, "session");
session.setTextContent((String) eprMap.get(SESSION));
endpoint.appendChild(session);
}
Modified: incubator/ode/scratch/pxe-iapi/bpel-api/src/main/java/com/fs/pxe/bpel/iapi/Message.java
URL: http://svn.apache.org/viewvc/incubator/ode/scratch/pxe-iapi/bpel-api/src/main/java/com/fs/pxe/bpel/iapi/Message.java?rev=423668&r1=423667&r2=423668&view=diff
==============================================================================
--- incubator/ode/scratch/pxe-iapi/bpel-api/src/main/java/com/fs/pxe/bpel/iapi/Message.java (original)
+++ incubator/ode/scratch/pxe-iapi/bpel-api/src/main/java/com/fs/pxe/bpel/iapi/Message.java Wed Jul 19 16:57:40 2006
@@ -35,14 +35,14 @@
* @param partName name of the part
* @return named {@l
*/
- Content getPart(String partName);
+ Element getPart(String partName);
/**
* Set the message part.
* @param partName name of part
* @param content part content
*/
- void setMessagePart(String partName, Content content);
+ void setMessagePart(String partName, Element content);
/**
* Set the message as an element. The name of the element is irrelevant,
Modified: incubator/ode/scratch/pxe-iapi/bpel-api/src/main/java/com/fs/pxe/bpel/iapi/MessageExchange.java
URL: http://svn.apache.org/viewvc/incubator/ode/scratch/pxe-iapi/bpel-api/src/main/java/com/fs/pxe/bpel/iapi/MessageExchange.java?rev=423668&r1=423667&r2=423668&view=diff
==============================================================================
--- incubator/ode/scratch/pxe-iapi/bpel-api/src/main/java/com/fs/pxe/bpel/iapi/MessageExchange.java (original)
+++ incubator/ode/scratch/pxe-iapi/bpel-api/src/main/java/com/fs/pxe/bpel/iapi/MessageExchange.java Wed Jul 19 16:57:40 2006
@@ -97,6 +97,17 @@
EndpointReference getEndpointReference()
throws BpelEngineException;
+ void setEndpointReference(EndpointReference ref);
+
+ /**
+ * Get a reference to the end-point that originated this message exchange and
+ * will be use for an eventual invocation later.
+ * @return end-point reference for this message exchange
+ */
+ EndpointReference getCallbackEndpointReference()
+ throws BpelEngineException;
+
+ void setCallbackEndpointReference(EndpointReference ref);
/**
* Return the type of message-exchange that resulted form this invocation
Modified: incubator/ode/scratch/pxe-iapi/bpel-api/src/main/java/com/fs/pxe/bpel/iapi/MyRoleMessageExchange.java
URL: http://svn.apache.org/viewvc/incubator/ode/scratch/pxe-iapi/bpel-api/src/main/java/com/fs/pxe/bpel/iapi/MyRoleMessageExchange.java?rev=423668&r1=423667&r2=423668&view=diff
==============================================================================
--- incubator/ode/scratch/pxe-iapi/bpel-api/src/main/java/com/fs/pxe/bpel/iapi/MyRoleMessageExchange.java (original)
+++ incubator/ode/scratch/pxe-iapi/bpel-api/src/main/java/com/fs/pxe/bpel/iapi/MyRoleMessageExchange.java Wed Jul 19 16:57:40 2006
@@ -74,4 +74,6 @@
* @return service name
*/
public QName getServiceName();
+
+ public void setEndpointReference(EndpointReference ref);
}
Modified: incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/engine/BpelProcess.java
URL: http://svn.apache.org/viewvc/incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/engine/BpelProcess.java?rev=423668&r1=423667&r2=423668&view=diff
==============================================================================
--- incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/engine/BpelProcess.java (original)
+++ incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/engine/BpelProcess.java Wed Jul 19 16:57:40 2006
@@ -31,9 +31,9 @@
import com.fs.pxe.bpel.runtime.PROCESS;
import com.fs.pxe.bpel.runtime.PropertyAliasEvaluationContext;
import com.fs.pxe.bpel.runtime.msgs.Messages;
+import com.fs.pxe.bpel.epr.WSAEndpoint;
import com.fs.utils.ArrayUtils;
import com.fs.utils.ObjectPrinter;
-import com.fs.utils.DOMUtils;
import com.fs.utils.msg.MessageBundle;
import java.io.*;
@@ -182,9 +182,6 @@
msgData, alias);
Node lValue = ectx.getRootNode();
-
- System.out.println("####### Evaluating property alias " + alias + " on " + DOMUtils.domToString(msgData));
-
if (alias.location != null)
try {
lValue = _expLangRuntimeRegistry.evaluateNode(alias.location, ectx);
@@ -202,21 +199,12 @@
throw new FaultException(_oprocess.constants.qnSelectionFailure, errmsg);
}
- if (!(lValue instanceof Node)) {
- String errmsg = __msgs.msgPropertyAliasReturnedRValue(alias
- .getDescription(), target);
- if (__log.isErrorEnabled()) {
- __log.error(errmsg);
- }
- throw new FaultException(_oprocess.constants.qnSelectionFailure, errmsg);
- }
-
if (lValue.getNodeType() == Node.ELEMENT_NODE) {
// This is a bit hokey, we concatenate all the children's values; we
// really should be checking to make sure that we are only dealing with
// text and attribute nodes.
StringBuffer val = new StringBuffer();
- NodeList nl = ((Element) lValue).getChildNodes();
+ NodeList nl = lValue.getChildNodes();
for (int i = 0; i < nl.getLength(); ++i) {
Node n = nl.item(i);
val.append(n.getNodeValue());
@@ -373,9 +361,10 @@
// Handling the "opaque correlation case": correlation is done on a
// session identifier associated with my epr
if (messageRoute == null) {
- // TODO: get the session id (from message exchnage?)
- String sessionId = null; //messageExchange.getSessionId();
- if (sessionId != null) {
+ EndpointReference ref = mex.getEndpointReference();
+ // Stateful interactions are only supported with WSA endpoints
+ if (ref != null && ref instanceof WSAEndpoint) {
+ String sessionId = ((WSAEndpoint)ref).getSessionId();
CorrelationKey key = new CorrelationKey(-1,
new String[] { sessionId });
messageRoute = correlator.findRoute(key);
Modified: incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/engine/BpelRuntimeContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/engine/BpelRuntimeContextImpl.java?rev=423668&r1=423667&r2=423668&view=diff
==============================================================================
--- incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/engine/BpelRuntimeContextImpl.java (original)
+++ incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/engine/BpelRuntimeContextImpl.java Wed Jul 19 16:57:40 2006
@@ -44,9 +44,11 @@
import com.fs.pxe.bpel.runtime.channels.TimerResponseChannel;
import com.fs.pxe.bpel.epr.MutableEndpoint;
import com.fs.pxe.bpel.epr.WSDL11Endpoint;
+import com.fs.pxe.bpel.epr.WSAEndpoint;
import com.fs.utils.DOMUtils;
import com.fs.utils.ObjectPrinter;
import com.fs.utils.Namespaces;
+import com.fs.utils.GUID;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -66,6 +68,7 @@
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
+import sun.rmi.transport.Endpoint;
class BpelRuntimeContextImpl implements BpelRuntimeContext {
@@ -205,7 +208,7 @@
sendEvent(new ProcessCompletionEvent(null));
_dao.finishCompletion();
- failOutstandingMessageExchanges();
+ completeOutstandingMessageExchanges();
}
/**
@@ -415,7 +418,7 @@
}
private PartnerLinkDAO fetchEndpointReference(PartnerLinkInstance pLink,
- boolean isMyEPR) throws FaultException {
+ boolean isMyEPR) throws FaultException {
ScopeDAO scopeDAO = _dao.getScope(pLink.scopeInstanceId);
PartnerLinkDAO spl = scopeDAO.getPartnerLink(pLink.partnerLink.getId());
PartnerLinkDAO ppl = scopeDAO.getProcessInstance().getProcess()
@@ -471,7 +474,6 @@
public Element writeEndpointReference(PartnerLinkInstance variable,
Element data) throws FaultException {
- fetchEndpointReferenceData(variable, false);
PartnerLinkDAO eprDAO = fetchEndpointReference(variable, false);
Element originalEprElmt = eprDAO.getPartnerEPR();
@@ -481,7 +483,7 @@
}
// Merging target endpoint with original endpoint and converting everything to
- // a WSDL 1.1 service.
+ // a WSA endpoint reference.
Map conversionMap = new HashMap();
if (originalEprElmt != null) {
MutableEndpoint originalEpr =
@@ -491,7 +493,7 @@
MutableEndpoint targetEpr =
(MutableEndpoint) _bpelProcess._engine._contexts.eprContext.resolveEndpointReference(data);
conversionMap.putAll(targetEpr.toMap());
- WSDL11Endpoint mergedEpr = new WSDL11Endpoint();
+ WSAEndpoint mergedEpr = new WSAEndpoint();
mergedEpr.fromMap(conversionMap);
eprDAO.setPartnerEPR(mergedEpr.toXML().getDocumentElement());
@@ -501,46 +503,48 @@
public String fetchEndpointSessionId(PartnerLinkInstance pLink,
boolean isMyEPR) throws FaultException {
Element eprElmt = fetchEndpointReferenceData(pLink, isMyEPR);
- // This is rather ugly as we're assuming that the session identifier is
- // always a direct
- // child of the epr element. However I rather that than adding a specific
- // session id
- // field in the DAO.
- NodeList ids = eprElmt.getElementsByTagNameNS(
- Namespaces.INTALIO_SESSION_NS, "identifier");
- if (ids.getLength() > 0)
- return ids.item(0).getTextContent();
- else
- return null;
+ // Internally our EPR are always WSA
+ WSAEndpoint wsaEndpoint = new WSAEndpoint();
+ wsaEndpoint.set(eprElmt);
+ return wsaEndpoint.getSessionId();
}
- private Element fetchMyRoleSessionEndpoint(PartnerLinkInstance pLink)
+ private EndpointReference fetchMyRoleSessionEndpoint(PartnerLinkInstance pLink)
throws FaultException {
- // TODO: Rework this: perhaps the EPR should not be set in the
- // TODO: database until this point?
- // Element myRoleEpr = eprDao.getEPR();
- // if (myRoleEpr == null)
- // throw new FaultException(
- // _bpelProcess._oprocess.constants.qnUninitializedPartnerRole,
- // "Endpoint reference for myRole on partner link "
- // + pLink.partnerLink.getName() + " isn't initialized!");
- // Element result = null; // TODO:
- // _bpelProcess._context.checkMyEndpoint(myRoleEpr);
- // // The endpoint has been modified to include a session id
- // if (myRoleEpr != result) {
- // eprDao.setEPR(result);
- // }
- return null;
- }
-
- public Element updatePartnerEndpointReference(PartnerLinkInstance variable,
- Element data) throws FaultException {
- ScopeDAO scopeDAO = _dao.getScope(variable.scopeInstanceId);
- PartnerLinkDAO eprDAO = scopeDAO.getPartnerLink(variable.partnerLink
- .getId());
- eprDAO.setPartnerEPR(data);
- return data;
- }
+ PartnerLinkDAO pl = fetchEndpointReference(pLink, true);
+ Element myRoleEpr = pl.getMyEPR();
+ if (myRoleEpr == null) throw new FaultException(
+ _bpelProcess._oprocess.constants.qnUninitializedPartnerRole,
+ "Endpoint reference for myRole on partner link " + pLink.partnerLink.getName() + " isn't initialized!");
+
+ EndpointReference resolvedEpr = _bpelProcess._engine._contexts.eprContext.resolveEndpointReference(myRoleEpr);
+ WSAEndpoint wsaEpr;
+ // We're usually careful to only deal with WSA endpoints internally, because
+ // it's much more convenient, but another endpoint could slip through (for
+ // example if it's set in the PM API).
+ if (resolvedEpr instanceof WSAEndpoint) {
+ wsaEpr = (WSAEndpoint) resolvedEpr;
+ } else {
+ wsaEpr = (WSAEndpoint) _bpelProcess._engine._contexts.eprContext
+ .convertEndpoint(Namespaces.WS_ADDRESSING_ENDPOINT, myRoleEpr);
+ }
+
+ // Including a session id in our EPR as it could be needed
+ if (wsaEpr.getSessionId() == null) {
+ wsaEpr.setSessionId(new GUID().toString());
+ myRoleEpr = wsaEpr.toXML().getDocumentElement();
+ pl.setMyEPR(myRoleEpr);
+ }
+ return wsaEpr;
+ }
+
+// public Element updatePartnerEndpointReference(PartnerLinkInstance variable,
+// Element data) throws FaultException {
+// ScopeDAO scopeDAO = _dao.getScope(variable.scopeInstanceId);
+// PartnerLinkDAO eprDAO = scopeDAO.getPartnerLink(variable.partnerLink.getId());
+// eprDAO.setPartnerEPR(data);
+// return data;
+// }
public Node convertEndpointReference(Element sourceNode, Node targetNode) {
QName nodeQName;
@@ -698,7 +702,6 @@
we.setType(WorkEvent.Type.TIMER);
_bpelProcess._engine._contexts.scheduler.schedulePersistedJob(we
.getDetail(), timeToFire);
-
}
public String invoke(PartnerLinkInstance partnerLink, Operation operation,
@@ -716,9 +719,10 @@
evt.setAspect(ProcessMessageExchangeEvent.PARTNER_INPUT);
// Getting my endpoint (if initialized and needed)
- Node myEPR = null;
+ EndpointReference myEPR = null;
if (partnerLink.partnerLink.hasMyRole())
myEPR = fetchMyRoleSessionEndpoint(partnerLink);
+
Element partnerEPR = fetchEndpointReferenceData(partnerLink, false);
MessageExchangeDAO mexDao = _dao.getConnection().createMessageExchange(
@@ -730,13 +734,11 @@
mexDao.setPartnerLinkModelId(partnerLink.partnerLink.getId());
mexDao.setProcess(_dao.getProcess());
mexDao.setInstance(_dao);
- mexDao
- .setPattern((operation.getOutput() != null ? MessageExchangePattern.REQUEST_RESPONSE
+ mexDao.setPattern((operation.getOutput() != null ? MessageExchangePattern.REQUEST_RESPONSE
: MessageExchangePattern.REQUEST_ONLY).toString());
mexDao.setEPR(partnerEPR);
mexDao.setChannel(channel == null ? null : channel.export());
- MessageDAO message = mexDao.createMessage(operation.getInput().getMessage()
- .getQName());
+ MessageDAO message = mexDao.createMessage(operation.getInput().getMessage().getQName());
mexDao.setRequest(message);
message.setData(outgoingMessage);
message.setType(operation.getInput().getMessage().getQName());
@@ -745,6 +747,7 @@
PartnerRoleMessageExchangeImpl mex = new PartnerRoleMessageExchangeImpl(
_bpelProcess._engine, mexDao,
partnerLink.partnerLink.partnerRolePortType, operation, partnerEndpoint);
+ if (myEPR != null) mex.setCallbackEndpointReference(myEPR);
// If we couldn't find the endpoint, then there is no sense
// in asking the IL to invoke.
@@ -756,10 +759,14 @@
mex.setFailure(FailureType.UNKNOWN_ENDPOINT, "UnknownEndpoint",
partnerEPR);
}
-
evt.setMexId(mexDao.getMessageExchangeId());
sendEvent(evt);
+ // MEX pattern is request only, at this point the status can only be a one way
+ if (mexDao.getPattern().equals(MessageExchangePattern.REQUEST_ONLY.toString())) {
+ mexDao.setStatus(MessageExchange.Status.ONE_WAY.toString());
+ }
+
// Check if there is a synchronous response, if so, we need to inject the
// message on the response channel.
switch (mex.getStatus()) {
@@ -996,6 +1003,31 @@
}
}
+ private void completeOutstandingMessageExchanges() {
+ String[] mexRefs = _outstandingRequests.releaseAll();
+ for (String mexId : mexRefs) {
+ MessageExchangeDAO mexDao = _dao.getConnection()
+ .getMessageExchange(mexId);
+ MyRoleMessageExchangeImpl mex = new MyRoleMessageExchangeImpl(
+ _bpelProcess._engine, mexDao);
+ switch(mex.getStatus()) {
+ case ASYNC:
+ case ONE_WAY:
+ case RESPONSE:
+ mex.setStatus(MessageExchange.Status.COMPLETED_OK);
+ break;
+ case REQUEST:
+ if (mex.getPattern().equals(MessageExchange.MessageExchangePattern.REQUEST_ONLY)) {
+ mex.setStatus(MessageExchange.Status.COMPLETED_OK);
+ break;
+ }
+ default:
+ mex.setFailure(FailureType.OTHER, "No response.", null);
+ _bpelProcess._engine._contexts.mexContext.onAsyncReply(mex);
+ }
+ }
+ }
+
private void failOutstandingMessageExchanges() {
String[] mexRefs = _outstandingRequests.releaseAll();
for (String mexId : mexRefs) {
@@ -1116,8 +1148,8 @@
}
public Element getSourceEPR(String mexId) {
- // TODO Auto-generated method stub
- return null;
+ MessageExchangeDAO dao = _dao.getConnection().getMessageExchange(mexId);
+ return dao.getCallbackEPR();
}
}
Modified: incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/engine/BpelServerImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/engine/BpelServerImpl.java?rev=423668&r1=423667&r2=423668&view=diff
==============================================================================
--- incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/engine/BpelServerImpl.java (original)
+++ incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/engine/BpelServerImpl.java Wed Jul 19 16:57:40 2006
@@ -580,7 +580,7 @@
throw new BpelEngineException(errmsg, ex);
}
- final ProcessDDInitializer pi = new ProcessDDInitializer(oprocess, defs, processDD);
+ final ProcessDDInitializer pi = new ProcessDDInitializer(oprocess, defs, processDD, _contexts.eprContext);
try {
Modified: incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/engine/MessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/engine/MessageExchangeImpl.java?rev=423668&r1=423667&r2=423668&view=diff
==============================================================================
--- incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/engine/MessageExchangeImpl.java (original)
+++ incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/engine/MessageExchangeImpl.java Wed Jul 19 16:57:40 2006
@@ -33,11 +33,12 @@
protected EndpointReference _epr;
+ protected EndpointReference _callbackEpr;
+
protected final MessageExchangeDAO _dao;
/**
* Constructor: requires the minimal information for a message exchange.
- * @param id
* @param pattern
* @param opname
* @param epr
@@ -189,6 +190,34 @@
public Message createMessage(javax.xml.namespace.QName msgType) {
MessageDAO mdao = getDAO().createMessage(msgType);
return new MessageImpl(mdao);
+ }
+
+ public void setEndpointReference(EndpointReference ref) {
+ _epr = ref;
+ if (ref != null)
+ getDAO().setEPR(ref.toXML().getDocumentElement());
+ }
+
+ public EndpointReference getEndpointReference() throws BpelEngineException {
+ if (_epr != null) return _epr;
+ if (getDAO().getEPR() == null)
+ return null;
+
+ return _epr = _engine._contexts.eprContext.resolveEndpointReference(getDAO().getEPR());
+ }
+
+ public void setCallbackEndpointReference(EndpointReference ref) {
+ _callbackEpr = ref;
+ if (ref != null)
+ getDAO().setCallbackEPR(ref.toXML().getDocumentElement());
+ }
+
+ public EndpointReference getCallbackEndpointReference() throws BpelEngineException {
+ if (_callbackEpr != null) return _callbackEpr;
+ if (getDAO().getCallbackEPR() == null)
+ return null;
+
+ return _callbackEpr = _engine._contexts.eprContext.resolveEndpointReference(getDAO().getCallbackEPR());
}
QName getServiceName() {
Modified: incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/engine/MessageImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/engine/MessageImpl.java?rev=423668&r1=423667&r2=423668&view=diff
==============================================================================
--- incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/engine/MessageImpl.java (original)
+++ incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/engine/MessageImpl.java Wed Jul 19 16:57:40 2006
@@ -1,13 +1,15 @@
package com.fs.pxe.bpel.engine;
import java.util.List;
+import java.util.ArrayList;
import javax.xml.namespace.QName;
import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+import org.w3c.dom.Node;
import com.fs.pxe.bpel.dao.MessageDAO;
-import com.fs.pxe.bpel.iapi.Content;
import com.fs.pxe.bpel.iapi.Message;
public class MessageImpl implements Message {
@@ -20,14 +22,17 @@
_dao = message;
}
- public Content getPart(String partName) {
- // TODO Auto-generated method stub
- return null;
+ public Element getPart(String partName) {
+ Element message = getMessage();
+ NodeList eltList = message.getElementsByTagName(partName);
+ if (eltList.getLength() == 0) return null;
+ else return (Element) eltList.item(0);
}
- public void setMessagePart(String partName, Content content) {
- // TODO Auto-generated method stub
-
+ public void setMessagePart(String partName, Element content) {
+ Element message = getMessage();
+ message.appendChild(message.getOwnerDocument().importNode(content, true));
+ setMessage(message);
}
public void setMessage(Element msg) {
@@ -43,8 +48,15 @@
}
public List<String> getParts() {
- // TODO Auto-generated method stub
- return null;
+ ArrayList<String> parts = new ArrayList<String>();
+ Element message = getMessage();
+ NodeList nodeList = message.getChildNodes();
+ for (int m = 0; m < nodeList.getLength(); m++) {
+ Node node = nodeList.item(m);
+ if (node.getNodeType() == Node.ELEMENT_NODE)
+ parts.add(node.getLocalName());
+ }
+ return parts;
}
}
Modified: incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/engine/MyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/engine/MyRoleMessageExchangeImpl.java?rev=423668&r1=423667&r2=423668&view=diff
==============================================================================
--- incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/engine/MyRoleMessageExchangeImpl.java (original)
+++ incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/engine/MyRoleMessageExchangeImpl.java Wed Jul 19 16:57:40 2006
@@ -66,20 +66,12 @@
}
-
public void complete() {
-
}
-
- public EndpointReference getEndpointReference() throws BpelEngineException {
- return null;
- }
-
public QName getServiceName() {
return getDAO().getCallee();
}
-
public void setClientId(String clientKey) {
getDAO().setCorrelationId(clientKey);
Modified: incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/engine/PartnerRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/engine/PartnerRoleMessageExchangeImpl.java?rev=423668&r1=423667&r2=423668&view=diff
==============================================================================
--- incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/engine/PartnerRoleMessageExchangeImpl.java (original)
+++ incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/engine/PartnerRoleMessageExchangeImpl.java Wed Jul 19 16:57:40 2006
@@ -30,21 +30,10 @@
public void replyOneWayOk() {
setStatus(Status.ONE_WAY);
}
-
-
- public EndpointReference getEndpointReference() throws BpelEngineException {
- if (_epr != null) return _epr;
- if (getDAO().getEPR() == null)
- return null;
-
- return _epr = _engine._contexts.eprContext.resolveEndpointReference(getDAO().getEPR());
- }
public void replyAsync() {
setStatus(Status.ASYNC);
}
-
-
public void replyWithFault(String faultType, Message outputFaultMessage) throws BpelEngineException {
boolean isAsync = isAsync();
Modified: incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/engine/ProcessDDInitializer.java
URL: http://svn.apache.org/viewvc/incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/engine/ProcessDDInitializer.java?rev=423668&r1=423667&r2=423668&view=diff
==============================================================================
--- incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/engine/ProcessDDInitializer.java (original)
+++ incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/engine/ProcessDDInitializer.java Wed Jul 19 16:57:40 2006
@@ -9,6 +9,7 @@
import com.fs.pxe.bpel.dao.PartnerLinkDAO;
import com.fs.pxe.bpel.iapi.BpelEngineException;
import com.fs.pxe.bpel.iapi.EndpointReference;
+import com.fs.pxe.bpel.iapi.EndpointReferenceContext;
import com.fs.pxe.bom.wsdl.Definition4BPEL;
import com.fs.pxe.axis2.dd.TDeployment;
import com.fs.pxe.axis2.dd.TInvoke;
@@ -39,11 +40,14 @@
private OProcess _oprocess;
private TDeployment.Process _dd;
private Definition4BPEL[] _defs;
+ private EndpointReferenceContext _eprContext;
- public ProcessDDInitializer(OProcess oprocess, Definition4BPEL[] defs, TDeployment.Process dd) {
+ public ProcessDDInitializer(OProcess oprocess, Definition4BPEL[] defs,
+ TDeployment.Process dd, EndpointReferenceContext eprContext) {
_oprocess = oprocess;
_dd = dd;
_defs = defs;
+ _eprContext = eprContext;
}
public void update(ProcessDAO processDAO) {
@@ -79,7 +83,11 @@
ProcessDDInitializer.__log.error(msg);
throw new BpelEngineException(msg);
}
- eprdao.setMyEPR(createServiceRef(epr));
+ // Making sure we're dealing only with WSA endpoints internally.
+ // These are much easier to deal with.
+ EndpointReference endpointRef = _eprContext.convertEndpoint(Namespaces.WS_ADDRESSING_ENDPOINT,
+ createServiceRef(epr));
+ eprdao.setMyEPR(endpointRef.toXML().getDocumentElement());
}
}
if (_dd.getInvokeList().size() > 0) {
@@ -112,7 +120,11 @@
ProcessDDInitializer.__log.error(msg);
throw new BpelEngineException(msg);
}
- eprdao.setPartnerEPR(createServiceRef(epr));
+ // Making sure we're dealing only with WSA endpoints internally.
+ // These are much easier to deal with.
+ EndpointReference endpointRef = _eprContext.convertEndpoint(Namespaces.WS_ADDRESSING_ENDPOINT,
+ createServiceRef(epr));
+ eprdao.setPartnerEPR(endpointRef.toXML().getDocumentElement());
}
}
}
Modified: incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/runtime/ASSIGN.java
URL: http://svn.apache.org/viewvc/incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/runtime/ASSIGN.java?rev=423668&r1=423667&r2=423668&view=diff
==============================================================================
--- incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/runtime/ASSIGN.java (original)
+++ incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/runtime/ASSIGN.java Wed Jul 19 16:57:40 2006
@@ -416,6 +416,7 @@
(rvalue.getNodeType() == Node.ELEMENT_NODE && !rvalue.getLocalName().equals("service-ref"))) {
Document doc = DOMUtils.newDocument();
Element serviceRef = doc.createElementNS(Namespaces.WS_BPEL_20_NS, "service-ref");
+ doc.appendChild(serviceRef);
serviceRef.appendChild(doc.importNode(rvalue, true));
rvalue = serviceRef;
}
Modified: incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/runtime/BpelRuntimeContext.java
URL: http://svn.apache.org/viewvc/incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/runtime/BpelRuntimeContext.java?rev=423668&r1=423667&r2=423668&view=diff
==============================================================================
--- incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/runtime/BpelRuntimeContext.java (original)
+++ incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/runtime/BpelRuntimeContext.java Wed Jul 19 16:57:40 2006
@@ -114,10 +114,8 @@
Node initializeVariable(VariableInstance var, Node initData);
- Element writeEndpointReference(PartnerLinkInstance variable, Element data) throws FaultException;
-
/**
- * Update (potentially partially) a partner EPR by consolidating the data received
+ * Writes (potentially partially) a partner EPR by consolidating the data received
* during a session-based interaction (message exchanges with session information)
* with the data we already have.
* @param variable
@@ -125,7 +123,7 @@
* @return the updated endpoint
* @throws FaultException
*/
- Element updatePartnerEndpointReference(PartnerLinkInstance variable, Element data) throws FaultException;
+ Element writeEndpointReference(PartnerLinkInstance variable, Element data) throws FaultException;
Node convertEndpointReference(Element epr, Node targetNode);
Modified: incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/runtime/EH_EVENT.java
URL: http://svn.apache.org/viewvc/incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/runtime/EH_EVENT.java?rev=423668&r1=423667&r2=423668&view=diff
==============================================================================
--- incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/runtime/EH_EVENT.java (original)
+++ incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/runtime/EH_EVENT.java Wed Jul 19 16:57:40 2006
@@ -212,10 +212,8 @@
// Trying to initialize partner epr based on a message-provided epr/session.
Node fromEpr = getBpelRuntimeContext().getSourceEPR(mexId);
if (fromEpr != null) {
- // TODO: EPR fixme
- throw new AssertionError("todo");
-// getBpelRuntimeContext().updatePartnerEndpointReference(
-// _scopeFrame.resolve(_oevent.partnerLink), fromEpr);
+ getBpelRuntimeContext().writeEndpointReference(
+ _scopeFrame.resolve(_oevent.partnerLink), (Element) fromEpr);
}
}
} catch (FaultException e) {
Modified: incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/runtime/INVOKE.java
URL: http://svn.apache.org/viewvc/incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/runtime/INVOKE.java?rev=423668&r1=423667&r2=423668&view=diff
==============================================================================
--- incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/runtime/INVOKE.java (original)
+++ incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/runtime/INVOKE.java Wed Jul 19 16:57:40 2006
@@ -32,8 +32,7 @@
}
public final void self() {
- Element outboundMsg = null;
-
+ Element outboundMsg;
try {
outboundMsg = setupOutbound(_oinvoke, _oinvoke.initCorrelationsInput);
} catch (FaultException e) {
@@ -75,8 +74,7 @@
FaultData fault = null;
try {
- Element response = getBpelRuntimeContext().getPartnerResponse(
- mexId);
+ Element response = getBpelRuntimeContext().getPartnerResponse(mexId);
getBpelRuntimeContext().initializeVariable(outputVar, response);
} catch (Exception ex) {
// TODO: Better error handling
@@ -84,11 +82,16 @@
}
try {
- for (Iterator iter = _oinvoke.initCorrelationsOutput.iterator(); iter
- .hasNext();) {
- OScope.CorrelationSet c = (OScope.CorrelationSet) iter.next();
-
- initializeCorrelation(_scopeFrame.resolve(c), outputVar);
+ for (OScope.CorrelationSet anInitCorrelationsOutput : _oinvoke.initCorrelationsOutput) {
+ initializeCorrelation(_scopeFrame.resolve(anInitCorrelationsOutput), outputVar);
+ }
+ if (_oinvoke.partnerLink.hasPartnerRole()) {
+ // Trying to initialize partner epr based on a message-provided epr/session.
+ Node fromEpr = getBpelRuntimeContext().getSourceEPR(mexId);
+ if (fromEpr != null) {
+ getBpelRuntimeContext().writeEndpointReference(
+ _scopeFrame.resolve(_oinvoke.partnerLink), (Element) fromEpr);
+ }
}
} catch (FaultException e) {
fault = createFault(e.getQName(), _oinvoke);
@@ -130,11 +133,8 @@
Collection<OScope.CorrelationSet> outboundInitiations)
throws FaultException {
if (outboundInitiations.size() > 0) {
- for (Iterator<OScope.CorrelationSet> iter = outboundInitiations
- .iterator(); iter.hasNext();) {
- OScope.CorrelationSet c = iter.next();
- initializeCorrelation(_scopeFrame.resolve(c), _scopeFrame
- .resolve(oinvoke.inputVar));
+ for (OScope.CorrelationSet c : outboundInitiations) {
+ initializeCorrelation(_scopeFrame.resolve(c), _scopeFrame.resolve(oinvoke.inputVar));
}
}
Modified: incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/runtime/PICK.java
URL: http://svn.apache.org/viewvc/incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/runtime/PICK.java?rev=423668&r1=423667&r2=423668&view=diff
==============================================================================
--- incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/runtime/PICK.java (original)
+++ incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/runtime/PICK.java Wed Jul 19 16:57:40 2006
@@ -15,6 +15,7 @@
import com.fs.pxe.bpel.runtime.channels.PickResponseML;
import com.fs.pxe.bpel.runtime.channels.TerminationML;
import com.fs.utils.xsd.Duration;
+import com.fs.utils.DOMUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.w3c.dom.Element;
@@ -56,9 +57,7 @@
try {
selectors = new Selector[_opick.onMessages.size()];
int idx = 0;
- for (Iterator<OPickReceive.OnMessage> i = _opick.onMessages.iterator(); i.hasNext(); ) {
- OPickReceive.OnMessage onMessage = i.next();
-
+ for (OPickReceive.OnMessage onMessage : _opick.onMessages) {
CorrelationKey key = null;
PartnerLinkInstance pLinkInstance = _scopeFrame.resolve(onMessage.partnerLink);
if (onMessage.matchCorrelation == null && !_opick.createInstanceFlag) {
@@ -67,29 +66,28 @@
if (!getBpelRuntimeContext().isEndpointReferenceInitialized(pLinkInstance, true))
throw new FaultException(_opick.getOwner().constants.qnCorrelationViolation,
"Endpoint reference for myRole on partner link " + onMessage.partnerLink + " has never been" +
- "initialized even though it's necessary for opaque correlations to work.");
+ "initialized even though it's necessary for opaque correlations to work.");
String sessionId = getBpelRuntimeContext().fetchEndpointSessionId(pLinkInstance, true);
- key = new CorrelationKey(-1, new String[] {sessionId});
+ key = new CorrelationKey(-1, new String[]{sessionId});
} else if (onMessage.matchCorrelation != null) {
if (!getBpelRuntimeContext().isCorrelationInitialized(_scopeFrame.resolve(onMessage.matchCorrelation)))
- throw new FaultException(_opick.getOwner().constants.qnCorrelationViolation,"Correlation not initialized.");
+ throw new FaultException(_opick.getOwner().constants.qnCorrelationViolation, "Correlation not initialized.");
key = getBpelRuntimeContext().readCorrelation(_scopeFrame.resolve(onMessage.matchCorrelation));
assert key != null;
}
- selectors[idx] = new Selector(idx,pLinkInstance,onMessage.operation.getName(), onMessage.operation.getOutput() == null, onMessage.messageExchangeId, key);
+ selectors[idx] = new Selector(idx, pLinkInstance, onMessage.operation.getName(), onMessage.operation.getOutput() == null, onMessage.messageExchangeId, key);
idx++;
}
timeout = null;
- for(Iterator<OPickReceive.OnAlarm> i = _opick.onAlarms.iterator(); i.hasNext(); ){
- OPickReceive.OnAlarm onAlarm = i.next();
+ for (OPickReceive.OnAlarm onAlarm : _opick.onAlarms) {
Date dt = onAlarm.forExpr != null
- ? offsetFromNow(getBpelRuntimeContext().getExpLangRuntime().evaluateAsDuration(onAlarm.forExpr,getEvaluationContext()))
+ ? offsetFromNow(getBpelRuntimeContext().getExpLangRuntime().evaluateAsDuration(onAlarm.forExpr, getEvaluationContext()))
: getBpelRuntimeContext().getExpLangRuntime().evaluateAsDate(onAlarm.untilExpr, getEvaluationContext()).getTime();
- if(timeout == null || timeout.compareTo(dt) > 0){
+ if (timeout == null || timeout.compareTo(dt) > 0) {
timeout = dt;
_alarm = onAlarm;
}
@@ -107,8 +105,7 @@
}
// Dead path all the alarms that have no chace of coming first.
- for (Iterator<OPickReceive.OnAlarm> i = _opick.onAlarms.iterator(); i.hasNext(); ) {
- OPickReceive.OnAlarm oa = i.next();
+ for (OPickReceive.OnAlarm oa : _opick.onAlarms) {
if (!oa.equals(_alarm)) {
dpe(oa.activity);
}
@@ -147,8 +144,7 @@
OPickReceive.OnMessage onMessage = _opick.onMessages.get(selectorIdx);
// dead path the non-selected onMessage blocks.
- for (Iterator<OPickReceive.OnMessage> i = _opick.onMessages.iterator();i.hasNext();) {
- OPickReceive.OnMessage onmsg = i.next();
+ for (OPickReceive.OnMessage onmsg : _opick.onMessages) {
if (!onmsg.equals(onMessage)) {
dpe(onmsg.activity);
}
@@ -159,9 +155,7 @@
dpe(_alarm.activity);
}
- FaultData fault = null;
-
-
+ FaultData fault;
Element msgEl = getBpelRuntimeContext().getMyRequest(mexId);
try {
getBpelRuntimeContext().initializeVariable(_scopeFrame.resolve(onMessage.variable),msgEl);
@@ -177,10 +171,11 @@
// Trying to initialize partner epr based on a message-provided epr/session.
Node fromEpr = getBpelRuntimeContext().getSourceEPR(mexId);
if (fromEpr != null) {
- // TODO: fixme
- throw new AssertionError("todo");
-// getBpelRuntimeContext().updatePartnerEndpointReference(
-// _scopeFrame.resolve(onMessage.partnerLink), fromEpr);
+ if (__log.isDebugEnabled())
+ __log.debug("Received callback EPR " + DOMUtils.domToString(fromEpr)
+ + " saving it on partner link " + onMessage.partnerLink.getName());
+ getBpelRuntimeContext().writeEndpointReference(
+ _scopeFrame.resolve(onMessage.partnerLink), (Element) fromEpr);
}
}
} catch (FaultException e) {
@@ -199,8 +194,8 @@
public void onTimeout() {
// Dead path all the onMessage activiites (the other alarms have already been DPE'ed)
- for (Iterator<OPickReceive.OnMessage> i = _opick.onMessages.iterator(); i.hasNext(); ) {
- dpe(i.next().activity);
+ for (OPickReceive.OnMessage onMessage : _opick.onMessages) {
+ dpe(onMessage.activity);
}
// Because we are done with all the DPE, we can simply re-use our control