You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by mr...@apache.org on 2006/07/07 18:27:47 UTC

svn commit: r419919 - in /incubator/ode/scratch/pxe-iapi: axis/src/main/java/com/fs/pxe/axis/ axis/src/main/java/com/fs/pxe/axis/epr/ axis/src/main/java/com/fs/pxe/axis/hooks/ bpel-runtime/src/main/java/com/fs/pxe/bpel/engine/

Author: mriou
Date: Fri Jul  7 09:27:44 2006
New Revision: 419919

URL: http://svn.apache.org/viewvc?rev=419919&view=rev
Log:
The Axis2 IAPI implementation seems to work properly. Reception, invocation and reply have been implemented, all the services necessary to the engine are here (transaction, executor, ...). There's still some mess for endpoints format and assignment but as long as you don't assign partner links, that should work (more on this coming next).

Added:
    incubator/ode/scratch/pxe-iapi/axis/src/main/java/com/fs/pxe/axis/SOAPUtils.java
Modified:
    incubator/ode/scratch/pxe-iapi/axis/src/main/java/com/fs/pxe/axis/AxisInvoker.java
    incubator/ode/scratch/pxe-iapi/axis/src/main/java/com/fs/pxe/axis/MessageExchangeContextImpl.java
    incubator/ode/scratch/pxe-iapi/axis/src/main/java/com/fs/pxe/axis/PXEServer.java
    incubator/ode/scratch/pxe-iapi/axis/src/main/java/com/fs/pxe/axis/PXEService.java
    incubator/ode/scratch/pxe-iapi/axis/src/main/java/com/fs/pxe/axis/epr/WSDL11Endpoint.java
    incubator/ode/scratch/pxe-iapi/axis/src/main/java/com/fs/pxe/axis/hooks/PXEMessageReceiver.java
    incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/engine/BpelProcess.java

Modified: incubator/ode/scratch/pxe-iapi/axis/src/main/java/com/fs/pxe/axis/AxisInvoker.java
URL: http://svn.apache.org/viewvc/incubator/ode/scratch/pxe-iapi/axis/src/main/java/com/fs/pxe/axis/AxisInvoker.java?rev=419919&r1=419918&r2=419919&view=diff
==============================================================================
--- incubator/ode/scratch/pxe-iapi/axis/src/main/java/com/fs/pxe/axis/AxisInvoker.java (original)
+++ incubator/ode/scratch/pxe-iapi/axis/src/main/java/com/fs/pxe/axis/AxisInvoker.java Fri Jul  7 09:27:44 2006
@@ -4,7 +4,7 @@
 import com.fs.pxe.bpel.iapi.Message;
 import com.fs.pxe.bpel.iapi.MessageExchange;
 import com.fs.pxe.bpel.iapi.PartnerRoleMessageExchange;
-import com.fs.pxe.bpel.scheduler.quartz.QuartzSchedulerImpl;
+import com.fs.utils.DOMUtils;
 import org.apache.axiom.om.OMElement;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.addressing.EndpointReference;
@@ -14,8 +14,12 @@
 import org.apache.axis2.client.async.Callback;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
 
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 
 /**
  * Invoke external services using Axis2 and eventually gets the service
@@ -25,28 +29,53 @@
 
   private static final Log __log = LogFactory.getLog(AxisInvoker.class);
 
-  private QuartzSchedulerImpl _scheduler;
+  private ExecutorService _executorService;
 
-  public AxisInvoker(QuartzSchedulerImpl scheduler) {
-    _scheduler = scheduler;
+  public AxisInvoker(ExecutorService executorService) {
+    _executorService = executorService;
   }
 
-  public void invokePartner(PartnerRoleMessageExchange pxeMex) {
+  public void invokePartner(final PartnerRoleMessageExchange pxeMex) {
     boolean isTwoWay = pxeMex.getMessageExchangePattern() ==
             com.fs.pxe.bpel.iapi.MessageExchange.MessageExchangePattern.REQUEST_RESPONSE;
     try {
-      OMElement payload = OMUtils.toOM(pxeMex.getRequest().getMessage());
+      Document doc = DOMUtils.newDocument();
+      Element op = doc.createElement(pxeMex.getOperationName());
+      op.appendChild(doc.importNode(DOMUtils.getFirstChildElement(pxeMex.getRequest().getMessage()), true));
+
+      final OMElement payload = OMUtils.toOM(op);
 
       Options options = new Options();
       EndpointReference axisEPR = new EndpointReference(((MutableEndpoint)pxeMex.getEndpointReference()).getUrl());
+      __log.debug("Axis2 sending message to " + axisEPR.getAddress() + " using MEX " + pxeMex);
+      __log.debug("Message: " + payload);
       options.setTo(axisEPR);
 
-      ServiceClient serviceClient = new ServiceClient();
+      final ServiceClient serviceClient = new ServiceClient();
       serviceClient.setOptions(options);
 
-      if (isTwoWay)
-        serviceClient.sendReceiveNonBlocking(payload, new AxisResponseCallback(pxeMex));
-      else
+      if (isTwoWay) {
+        // Invoking in a separate thread even though we're supposed to wait for a synchronous reply
+        // to force clear transaction separation.
+        Future<OMElement> freply = _executorService.submit(new Callable<OMElement>() {
+          public OMElement call() throws Exception {
+            return serviceClient.sendReceive(payload);
+          }
+        });
+        OMElement reply = null;
+        try {
+          reply = freply.get();
+        } catch (Exception e) {
+          __log.error("We've been interrupted while waiting for reply to MEX " + pxeMex + "!!!");
+        }
+
+        final Message response = pxeMex.createMessage(pxeMex.getOperation().getOutput().getMessage().getQName());
+        Element responseElmt = OMUtils.toDOM(reply);
+        __log.debug("Received synchronous response for MEX " + pxeMex);
+        __log.debug("Message: " + DOMUtils.domToString(responseElmt));
+        response.setMessage(OMUtils.toDOM(reply));
+        pxeMex.reply(response);
+      } else
         serviceClient.sendRobust(payload);
     } catch (AxisFault axisFault) {
       String errmsg = "Error sending message to Axis2 for PXE mex " + pxeMex;
@@ -55,6 +84,9 @@
     }
   }
 
+  // This code can be used later on if we start using Axis2 sendNonBlocking but for now
+  // we have to block the calling thread as the engine expects an immediate (non delayed)
+  // response.
   private class AxisResponseCallback extends Callback {
 
     private PartnerRoleMessageExchange _pxeMex;
@@ -65,14 +97,11 @@
 
     public void onComplete(AsyncResult asyncResult) {
       final Message response = _pxeMex.createMessage(_pxeMex.getOperation().getOutput().getMessage().getQName());
+      if (__log.isDebugEnabled())
+        __log.debug("Received a synchronous response for service invocation on MEX " + _pxeMex);
       try {
         response.setMessage(OMUtils.toDOM(asyncResult.getResponseEnvelope().getBody()));
-        _scheduler.execTransaction(new Callable<Object>() {
-          public Object call() throws Exception {
-            _pxeMex.reply(response);
-            return null;
-          }
-        });
+        _pxeMex.reply(response);
       } catch (AxisFault axisFault) {
         __log.error("Error translating message.", axisFault);
         _pxeMex.replyWithFailure(MessageExchange.FailureType.FORMAT_ERROR, axisFault.getMessage(), null);
@@ -82,14 +111,11 @@
     }
 
     public void onError(final Exception exception) {
+      if (__log.isDebugEnabled())
+        __log.debug("Received a synchronous failure for service invocation on MEX " + _pxeMex);
       try {
-        _scheduler.execTransaction(new Callable<Object>() {
-          public Object call() throws Exception {
-            _pxeMex.replyWithFailure(MessageExchange.FailureType.OTHER,
-                    "Error received from invoked service: " + exception.toString(), null);
-            return null;
-          }
-        });
+        _pxeMex.replyWithFailure(MessageExchange.FailureType.OTHER,
+                "Error received from invoked service: " + exception.toString(), null);
       } catch (Exception e) {
         __log.error("Error delivering failure.", e);
       }

Modified: incubator/ode/scratch/pxe-iapi/axis/src/main/java/com/fs/pxe/axis/MessageExchangeContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/scratch/pxe-iapi/axis/src/main/java/com/fs/pxe/axis/MessageExchangeContextImpl.java?rev=419919&r1=419918&r2=419919&view=diff
==============================================================================
--- incubator/ode/scratch/pxe-iapi/axis/src/main/java/com/fs/pxe/axis/MessageExchangeContextImpl.java (original)
+++ incubator/ode/scratch/pxe-iapi/axis/src/main/java/com/fs/pxe/axis/MessageExchangeContextImpl.java Fri Jul  7 09:27:44 2006
@@ -26,10 +26,14 @@
   }
 
   public void invokePartner(PartnerRoleMessageExchange partnerRoleMessageExchange) throws ContextException {
+    if (__log.isDebugEnabled())
+      __log.debug("Invoking a partner operation: " + partnerRoleMessageExchange.getOperationName());
     _invoker.invokePartner(partnerRoleMessageExchange);
   }
 
   public void onAsyncReply(MyRoleMessageExchange myRoleMessageExchange) throws BpelEngineException {
+    if (__log.isDebugEnabled())
+      __log.debug("Processing an async reply from service " + myRoleMessageExchange.getServiceName());
     PXEService service = _server.getService(myRoleMessageExchange.getServiceName());
     service.notifyResponse(myRoleMessageExchange);
   }

Modified: incubator/ode/scratch/pxe-iapi/axis/src/main/java/com/fs/pxe/axis/PXEServer.java
URL: http://svn.apache.org/viewvc/incubator/ode/scratch/pxe-iapi/axis/src/main/java/com/fs/pxe/axis/PXEServer.java?rev=419919&r1=419918&r2=419919&view=diff
==============================================================================
--- incubator/ode/scratch/pxe-iapi/axis/src/main/java/com/fs/pxe/axis/PXEServer.java (original)
+++ incubator/ode/scratch/pxe-iapi/axis/src/main/java/com/fs/pxe/axis/PXEServer.java Fri Jul  7 09:27:44 2006
@@ -189,7 +189,7 @@
   }
 
   public AxisInvoker createInvoker() {
-    AxisInvoker invoker = new AxisInvoker(_scheduler);
+    AxisInvoker invoker = new AxisInvoker(_executorService);
     return invoker;
   }
 

Modified: incubator/ode/scratch/pxe-iapi/axis/src/main/java/com/fs/pxe/axis/PXEService.java
URL: http://svn.apache.org/viewvc/incubator/ode/scratch/pxe-iapi/axis/src/main/java/com/fs/pxe/axis/PXEService.java?rev=419919&r1=419918&r2=419919&view=diff
==============================================================================
--- incubator/ode/scratch/pxe-iapi/axis/src/main/java/com/fs/pxe/axis/PXEService.java (original)
+++ incubator/ode/scratch/pxe-iapi/axis/src/main/java/com/fs/pxe/axis/PXEService.java Fri Jul  7 09:27:44 2006
@@ -3,6 +3,7 @@
 import com.fs.pxe.bpel.engine.BpelServerImpl;
 import com.fs.pxe.bpel.iapi.Message;
 import com.fs.pxe.bpel.iapi.MyRoleMessageExchange;
+import com.fs.pxe.bpel.iapi.MessageExchange;
 import com.fs.utils.DOMUtils;
 import org.apache.axiom.om.OMElement;
 import org.apache.axiom.soap.SOAPEnvelope;
@@ -68,6 +69,8 @@
           _waitingCallbacks.put(pxeMex.getMessageExchangeId(), callback);
         }
 
+        if (__log.isDebugEnabled())
+          __log.debug("Invoking PXE.");
         pxeMex.invoke(pxeRequest);
 
         // Handle the response if necessary.
@@ -76,8 +79,12 @@
           SOAPEnvelope envelope = soapFactory.getDefaultEnvelope();
           outMsgContext.setEnvelope(envelope);
           // Waiting for a callback
-          pxeMex = callback.getResponse(TIMEOUT);
+          if (_waitingCallbacks.get(pxeMex.getMessageExchangeId()) != null
+                  && pxeMex.getStatus() == MessageExchange.Status.ASYNC)
+            pxeMex = callback.getResponse(TIMEOUT);
+          
           // Hopefully we have a response
+          __log.debug("Handling response for MEX " + pxeMex);
           onResponse(pxeMex, envelope);
         } else {
           __log.debug("PXE MEX " + pxeMex + " completed ASYNCHRONOUSLY.");

Added: incubator/ode/scratch/pxe-iapi/axis/src/main/java/com/fs/pxe/axis/SOAPUtils.java
URL: http://svn.apache.org/viewvc/incubator/ode/scratch/pxe-iapi/axis/src/main/java/com/fs/pxe/axis/SOAPUtils.java?rev=419919&view=auto
==============================================================================
--- incubator/ode/scratch/pxe-iapi/axis/src/main/java/com/fs/pxe/axis/SOAPUtils.java (added)
+++ incubator/ode/scratch/pxe-iapi/axis/src/main/java/com/fs/pxe/axis/SOAPUtils.java Fri Jul  7 09:27:44 2006
@@ -0,0 +1,50 @@
+package com.fs.pxe.axis;
+
+import com.fs.utils.stl.CollectionsX;
+
+import javax.wsdl.Binding;
+import javax.wsdl.Port;
+import javax.wsdl.extensions.soap.SOAPBinding;
+import java.util.Collection;
+
+import org.w3c.dom.Element;
+
+/**
+ * Utility class to handle SOAP messages wrapping/unwrapping depending
+ * on binding style Document vs. RPC.
+ */
+public class SOAPUtils {
+
+//  public void wrap(Element msg, Port port) {
+//
+//  }
+//
+//  private boolean isRPC(Port wsdlPort) {
+//    if (wsdlPort == null)
+//      throw new NullPointerException("null wsdlPort");
+//
+//    Binding binding = wsdlPort.getBinding();
+//
+//    if (binding == null)
+//      throw new SoapBindingException(wsdlPort + " is missing <wsdl:binding>",
+//              "port", wsdlPort.getName(),
+//              __msgs.msgNoBindingForPort());
+//
+//    Collection soapBindings = CollectionsX.filter(binding.getExtensibilityElements(), SOAPBinding.class);
+//    if (soapBindings.isEmpty()) {
+//      throw new SoapBindingException(wsdlPort + " is missing <soapbind:binding>",
+//              "port", wsdlPort.getName(),
+//              __msgs.msgNoSoapBindingForPort());
+//    }
+//
+//    else if (soapBindings.size() > 1) {
+//      throw new SoapBindingException(wsdlPort + " has multiple <soapbind:binding> elements!",
+//              "port", wsdlPort.getName(),
+//              __msgs.msgMultipleSoapBindingsForPort());
+//    }
+//
+//    SOAPBinding soapBinding = (SOAPBinding) soapBindings.iterator().next();
+//    String style  = soapBinding.getStyle();
+//    return style != null && style.equals("rpc");
+//  }
+}

Modified: incubator/ode/scratch/pxe-iapi/axis/src/main/java/com/fs/pxe/axis/epr/WSDL11Endpoint.java
URL: http://svn.apache.org/viewvc/incubator/ode/scratch/pxe-iapi/axis/src/main/java/com/fs/pxe/axis/epr/WSDL11Endpoint.java?rev=419919&r1=419918&r2=419919&view=diff
==============================================================================
--- incubator/ode/scratch/pxe-iapi/axis/src/main/java/com/fs/pxe/axis/epr/WSDL11Endpoint.java (original)
+++ incubator/ode/scratch/pxe-iapi/axis/src/main/java/com/fs/pxe/axis/epr/WSDL11Endpoint.java Fri Jul  7 09:27:44 2006
@@ -21,7 +21,10 @@
   }
 
   public String getUrl() {
-    return _serviceElmt.getElementsByTagNameNS(Namespaces.SOAP_NS, "address").item(0).getTextContent();
+    System.out.println("### Getting endpoint " + DOMUtils.domToString(_serviceElmt));
+    Element port = (Element) _serviceElmt.getElementsByTagNameNS(Namespaces.WSDL_11, "port").item(0);
+    Element address = (Element) port.getElementsByTagNameNS(Namespaces.SOAP_NS, "address").item(0);
+    return address.getAttribute("location");
   }
 
   public boolean accept(Node node) {

Modified: incubator/ode/scratch/pxe-iapi/axis/src/main/java/com/fs/pxe/axis/hooks/PXEMessageReceiver.java
URL: http://svn.apache.org/viewvc/incubator/ode/scratch/pxe-iapi/axis/src/main/java/com/fs/pxe/axis/hooks/PXEMessageReceiver.java?rev=419919&r1=419918&r2=419919&view=diff
==============================================================================
--- incubator/ode/scratch/pxe-iapi/axis/src/main/java/com/fs/pxe/axis/hooks/PXEMessageReceiver.java (original)
+++ incubator/ode/scratch/pxe-iapi/axis/src/main/java/com/fs/pxe/axis/hooks/PXEMessageReceiver.java Fri Jul  7 09:27:44 2006
@@ -23,6 +23,9 @@
   private ExecutorService _executorService;
 
   public final void receive(final MessageContext msgContext) throws AxisFault {
+    if (__log.isDebugEnabled())
+      __log.debug("Received message for " + msgContext.getAxisService().getName() +
+              "." + msgContext.getAxisOperation().getName());
     if (hasResponse(msgContext.getAxisOperation())) {
       // Expecting a response, running in the same thread
       MessageContext outMsgContext = Utils.createOutMessageContext(msgContext);

Modified: incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/engine/BpelProcess.java
URL: http://svn.apache.org/viewvc/incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/engine/BpelProcess.java?rev=419919&r1=419918&r2=419919&view=diff
==============================================================================
--- incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/engine/BpelProcess.java (original)
+++ incubator/ode/scratch/pxe-iapi/bpel-runtime/src/main/java/com/fs/pxe/bpel/engine/BpelProcess.java Fri Jul  7 09:27:44 2006
@@ -33,6 +33,7 @@
 import com.fs.pxe.bpel.runtime.msgs.Messages;
 import com.fs.utils.ArrayUtils;
 import com.fs.utils.ObjectPrinter;
+import com.fs.utils.DOMUtils;
 import com.fs.utils.msg.MessageBundle;
 
 import java.io.*;