You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by mr...@apache.org on 2006/11/27 17:23:41 UTC
svn commit: r479678 - in
/incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2:
ExternalService.java ODEServer.java ODEService.java P2PMexContextImpl.java
Author: mriou
Date: Mon Nov 27 08:23:40 2006
New Revision: 479678
URL: http://svn.apache.org/viewvc?view=rev&rev=479678
Log:
ODE-83 Direct process to process invocation without going all the way back to the transport.
Added:
incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/P2PMexContextImpl.java
Modified:
incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ExternalService.java
incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEService.java
Modified: incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ExternalService.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ExternalService.java?view=diff&rev=479678&r1=479677&r2=479678
==============================================================================
--- incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ExternalService.java (original)
+++ incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ExternalService.java Mon Nov 27 08:23:40 2006
@@ -49,6 +49,7 @@
/**
* Acts as a service not provided by ODE. Used mainly for invocation as a way to
* maintain the WSDL decription of used services.
+ * @author Matthieu Riou <mriou at apache dot org>
*/
public class ExternalService implements PartnerRoleChannel {
@@ -188,5 +189,13 @@
public void setReplicateEmptyNS(boolean isReplicateEmptyNS) {
_isReplicateEmptyNS = isReplicateEmptyNS;
+ }
+
+ public String getPortName() {
+ return _portName;
+ }
+
+ public QName getServiceName() {
+ return _serviceName;
}
}
Modified: incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java?view=diff&rev=479678&r1=479677&r2=479678
==============================================================================
--- incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java (original)
+++ incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java Mon Nov 27 08:23:40 2006
@@ -70,6 +70,7 @@
/**
* Server class called by our Axis hooks to handle all ODE lifecycle
* management.
+ * @author Matthieu Riou <mriou at apache dot org>
*/
public class ODEServer {
@@ -423,7 +424,8 @@
_server.setDaoConnectionFactory(_daoCF);
_server.setInMemDaoConnectionFactory(new org.apache.ode.bpel.memdao.BpelDAOConnectionFactoryImpl());
_server.setEndpointReferenceContext(new EndpointReferenceContextImpl(this));
- _server.setMessageExchangeContext(new MessageExchangeContextImpl(this));
+ _server.setMessageExchangeContext(
+ new P2PMexContextImpl(this, new MessageExchangeContextImpl(this), _executorService, _txMgr));
_server.setBindingContext(new BindingContextImpl(this, _store));
_server.setScheduler(_scheduler);
_server.setProcessStore(_store);
Modified: incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEService.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEService.java?view=diff&rev=479678&r1=479677&r2=479678
==============================================================================
--- incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEService.java (original)
+++ incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEService.java Mon Nov 27 08:23:40 2006
@@ -31,11 +31,7 @@
import org.apache.ode.bpel.epr.EndpointFactory;
import org.apache.ode.bpel.epr.MutableEndpoint;
import org.apache.ode.bpel.epr.WSAEndpoint;
-import org.apache.ode.bpel.iapi.BpelServer;
-import org.apache.ode.bpel.iapi.EndpointReference;
-import org.apache.ode.bpel.iapi.Message;
-import org.apache.ode.bpel.iapi.MessageExchange;
-import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
+import org.apache.ode.bpel.iapi.*;
import org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern;
import org.apache.ode.utils.DOMUtils;
import org.apache.ode.utils.GUID;
@@ -57,11 +53,12 @@
/**
* A running service, encapsulates the Axis service, its receivers and our
* receivers as well.
+ * @author Matthieu Riou <mriou at apache dot org>
*/
public class ODEService {
private static final Log __log = LogFactory.getLog(ODEService.class);
- private static final int TIMEOUT = 2 * 60 * 1000;
+ public static final int TIMEOUT = 2 * 60 * 1000;
private AxisService _axisService;
private BpelServer _server;
@@ -297,7 +294,7 @@
return _axisService;
}
- class ResponseCallback {
+ static class ResponseCallback {
private MyRoleMessageExchange _mmex;
private boolean _timedout;
Added: incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/P2PMexContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/P2PMexContextImpl.java?view=auto&rev=479678
==============================================================================
--- incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/P2PMexContextImpl.java (added)
+++ incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/P2PMexContextImpl.java Mon Nov 27 08:23:40 2006
@@ -0,0 +1,217 @@
+package org.apache.ode.axis2;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.iapi.*;
+import org.apache.ode.utils.DOMUtils;
+import org.apache.ode.utils.GUID;
+import org.w3c.dom.Element;
+
+import javax.transaction.TransactionManager;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+/**
+ * @author Matthieu Riou <mriou at apache dot org>
+ */
+public class P2PMexContextImpl implements MessageExchangeContext {
+ private static final Log __log = LogFactory.getLog(P2PMexContextImpl.class);
+
+ private MessageExchangeContext _wrapped;
+ private ODEServer _server;
+ private ExecutorService _executorService;
+ private TransactionManager _txMgr;
+
+ private Map<String, ODEService.ResponseCallback> _waitingCallbacks =
+ new HashMap<String, ODEService.ResponseCallback>();
+
+
+ public P2PMexContextImpl(ODEServer server, MessageExchangeContext wrapped,
+ ExecutorService executorService, TransactionManager txMgr) {
+ _server = server;
+ _wrapped = wrapped;
+ _executorService = executorService;
+ _txMgr = txMgr;
+ }
+
+ public void invokePartner(final PartnerRoleMessageExchange mex) throws ContextException {
+ ExternalService target = (ExternalService) mex.getChannel();
+ ODEService myService = _server.getService(target.getServiceName(), target.getPortName());
+ if (myService != null) {
+ MyRoleMessageExchange odeMex = null;
+
+ // Starting a new transaction in a new thread
+ Future<MyRoleMessageExchange> futureMex = _executorService.submit(new Callable<MyRoleMessageExchange>() {
+ public MyRoleMessageExchange call() throws Exception {
+ MyRoleMessageExchange mmex = null;
+ Exception thrown;
+ int retryCount = 0;
+ do {
+ try {
+ _txMgr.begin();
+ mmex = buildAndSendMex(mex);
+ _txMgr.commit();
+ thrown = null;
+ } catch (Exception e) {
+ _txMgr.rollback();
+ retryCount++;
+ thrown = e;
+ }
+ } while (thrown != null && retryCount < 3);
+
+ if (thrown != null) throw thrown;
+ return mmex;
+ }
+ });
+
+ if (mex.getMessageExchangePattern() != MessageExchange.MessageExchangePattern.REQUEST_ONLY) {
+ // Partner MEX must be failed in the caller's transaction
+ try {
+ odeMex = futureMex.get();
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof FailureException) {
+ FailureException fe = (FailureException)e.getCause();
+ failExchange(mex, fe._failureType, fe._explanation);
+ } else {
+ failExchange(mex, MessageExchange.FailureType.OTHER, "Message exchange " + odeMex +
+ " produced an unexpected exception while being processed by the engine!");
+ }
+ } catch (Exception e) {
+ failExchange(mex, MessageExchange.FailureType.OTHER, "Message exchange " + odeMex +
+ " produced an unexpected exception while being processed by the engine!");
+ }
+
+ if (odeMex != null) {
+ // Cleaning up
+ _waitingCallbacks.remove(odeMex.getMessageExchangeId());
+ try {
+ onResponse(mex, odeMex);
+ } catch (Exception e) {
+ failExchange(mex, MessageExchange.FailureType.FORMAT_ERROR,
+ "Got an exception when handling the response: " + e.toString());
+ }
+ }
+ }
+ } else {
+ _wrapped.invokePartner(mex);
+ }
+ }
+
+ public void onAsyncReply(MyRoleMessageExchange myRoleMex) throws BpelEngineException {
+ ODEService.ResponseCallback callback = _waitingCallbacks.get(myRoleMex.getClientId());
+ if (callback == null) {
+ __log.debug("No active service for message exchange " + myRoleMex + " on process to process interaction.");
+ _wrapped.onAsyncReply(myRoleMex);
+ } else {
+ callback.onResponse(myRoleMex);
+ _waitingCallbacks.remove(myRoleMex.getClientId());
+ }
+ }
+
+ private MyRoleMessageExchange buildAndSendMex(PartnerRoleMessageExchange pmex) throws FailureException {
+ ExternalService target = (ExternalService) pmex.getChannel();
+
+ // Creating message exchange
+ String messageId = new GUID().toString();
+
+ MyRoleMessageExchange odeMex = _server.getBpelServer().getEngine().createMessageExchange("" + messageId,
+ target.getServiceName(), pmex.getOperationName());
+ __log.debug("ODE routed to operation " + pmex.getOperationName() + " from service " + target.getServiceName());
+
+ ODEService.ResponseCallback callback = null;
+ if (odeMex.getOperation() != null) {
+ // Preparing message to send to ODE
+ Message odeRequest = odeMex.createMessage(odeMex.getOperation().getInput().getMessage().getQName());
+ copyHeader(pmex, odeMex);
+ odeRequest.setMessage(pmex.getRequest().getMessage());
+
+ // Preparing a callback just in case we would need one.
+ if (odeMex.getOperation().getOutput() != null) {
+ callback = new ODEService.ResponseCallback();
+ _waitingCallbacks.put(odeMex.getClientId(), callback);
+ }
+
+ if (__log.isDebugEnabled()) {
+ __log.debug("Invoking ODE using MEX " + odeMex);
+ __log.debug("Message content: " + DOMUtils.domToString(odeRequest.getMessage()));
+ }
+
+ // Invoking ODE
+ odeMex.invoke(odeRequest);
+
+ boolean timeout = false;
+ // Invocation response could be delayed, if so we have to wait for it.
+ if (odeMex.getMessageExchangePattern() == MessageExchange.MessageExchangePattern.REQUEST_RESPONSE &&
+ odeMex.getStatus() == MessageExchange.Status.ASYNC) {
+ odeMex = callback.getResponse(ODEService.TIMEOUT);
+ if (odeMex == null) timeout = true;
+ }
+
+ if (timeout) {
+ __log.error("Timeout when waiting for response to MEX " + odeMex);
+ throw new FailureException(MessageExchange.FailureType.ABORTED, "Timeout after " +
+ ODEService.TIMEOUT + "ms");
+ }
+ } else {
+ // Somethings's wrong
+ throw new FailureException(MessageExchange.FailureType.UNKNOWN_OPERATION, "Operation to invoke is null!");
+ }
+
+ return odeMex;
+ }
+
+ private void onResponse(PartnerRoleMessageExchange mex, MyRoleMessageExchange responseMex) {
+ switch (responseMex.getStatus()) {
+ case FAULT:
+ mex.replyWithFault(responseMex.getFault(), responseMex.getFaultResponse());
+ case ASYNC:
+ case RESPONSE:
+ Element responseElmt = responseMex.getResponse().getMessage();
+ if (__log.isDebugEnabled()) __log.debug("Received response message " +
+ responseElmt == null ? "null" : DOMUtils.domToString(responseElmt));
+ Message response = mex.createMessage(responseMex.getOperation().getOutput().getMessage().getQName());
+ __log.debug("Received synchronous response for MEX " + responseMex);
+ __log.debug("Message: " + DOMUtils.domToString(responseElmt));
+ response.setMessage(responseElmt);
+ mex.reply(response);
+ break;
+ case FAILURE:
+ failExchange(mex, MessageExchange.FailureType.COMMUNICATION_ERROR, "The called process failed " +
+ "in a process to process interaction.");
+ break;
+ default:
+ failExchange(mex, MessageExchange.FailureType.NO_RESPONSE, "The called process failed " +
+ "to respond properly in a process to process interaction.");
+ __log.warn("Received ODE message exchange in unexpected state: " + mex.getStatus());
+ }
+ }
+
+ private void copyHeader(MessageExchange source, MessageExchange dest) {
+ if (source.getProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID) != null)
+ dest.setProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID,
+ source.getProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID));
+ if (source.getProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID) != null)
+ dest.setProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID,
+ source.getProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID));
+ }
+
+ private void failExchange(PartnerRoleMessageExchange mex, MessageExchange.FailureType failure, String explanation) {
+ __log.error("Failure while sending mex " + mex + ": " + explanation);
+ mex.replyWithFailure(failure, explanation, null);
+ }
+
+ private static class FailureException extends Exception {
+ public MessageExchange.FailureType _failureType;
+ public String _explanation;
+
+ public FailureException(MessageExchange.FailureType failureType, String explanation) {
+ super(explanation);
+ _explanation = explanation;
+ _failureType = failureType;
+ }
+ }
+}