You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@synapse.apache.org by as...@apache.org on 2007/09/06 11:01:48 UTC

svn commit: r573187 - in /webservices/synapse/trunk/java/modules: core/src/main/java/org/apache/synapse/ core/src/main/java/org/apache/synapse/core/axis2/ transports/src/main/java/org/apache/axis2/transport/base/ transports/src/main/java/org/apache/axi...

Author: asankha
Date: Thu Sep  6 02:01:47 2007
New Revision: 573187

URL: http://svn.apache.org/viewvc?rev=573187&view=rev
Log:
support messages to be marked as out-only using
<syn:property name="OUT_ONLY" value="true"/>

Also reverse the transportIsBlocking as Axis2 uses it in its reverse form

enhance JMS transport to send binary or text payloads without a soap envelope (where they were created out of non-soap messages)

Modified:
    webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/Constants.java
    webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/AnonymousServiceFactory.java
    webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2FlexibleMEPClient.java
    webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/base/AbstractTransportListener.java
    webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSSender.java
    webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/nhttp/ServerWorker.java

Modified: webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/Constants.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/Constants.java?rev=573187&r1=573186&r2=573187&view=diff
==============================================================================
--- webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/Constants.java (original)
+++ webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/Constants.java Thu Sep  6 02:01:47 2007
@@ -127,6 +127,8 @@
 
     String RESPONSE = "RESPONSE";
 
+    String OUT_ONLY = "OUT_ONLY";
+
     /** The tracing state -off */
     int TRACING_OFF =0;
     /** The tracing state-on */

Modified: webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/AnonymousServiceFactory.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/AnonymousServiceFactory.java?rev=573187&r1=573186&r2=573187&view=diff
==============================================================================
--- webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/AnonymousServiceFactory.java (original)
+++ webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/AnonymousServiceFactory.java Thu Sep  6 02:01:47 2007
@@ -21,9 +21,9 @@
 
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.wsdl.WSDLConstants;
-import org.apache.axis2.util.CallbackReceiver;
 import org.apache.axis2.description.AxisService;
 import org.apache.axis2.description.AxisMessage;
+import org.apache.axis2.description.OutOnlyAxisOperation;
 import org.apache.axis2.engine.AxisConfiguration;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -47,7 +47,8 @@
     private static final String SEC_AND_ADDR    = "__SEC_AND_ADDR__";
     private static final String RM_SEC_AND_ADDR = "__RM_SEC_AND_ADDR__";
 
-    public static final String DYNAMIC_OPERATION  = "__DYNAMIC_OPERATION__";
+    public static final String OUT_IN_OPERATION = "__OUT_IN_OPERATION__";
+    public static final String OUT_ONLY_OPERATION = "__OUT_ONLY_OPERATION__";
 
     private static SynapseCallbackReceiver synapseCallbackReceiver = null;
 
@@ -130,7 +131,7 @@
 
         try {
             DynamicAxisOperation dynamicOperation =
-                new DynamicAxisOperation(new QName(DYNAMIC_OPERATION));
+                new DynamicAxisOperation(new QName(OUT_IN_OPERATION));
             dynamicOperation.setMessageReceiver(getCallbackReceiver(synCfg));
             AxisMessage inMsg = new AxisMessage();
             inMsg.setName("in-message");
@@ -140,8 +141,18 @@
             outMsg.setParent(dynamicOperation);
             dynamicOperation.addMessage(inMsg, WSDLConstants.MESSAGE_LABEL_OUT_VALUE);
             dynamicOperation.addMessage(outMsg, WSDLConstants.MESSAGE_LABEL_IN_VALUE);
+
+            OutOnlyAxisOperation asyncOperation =
+                new OutOnlyAxisOperation(new QName(OUT_ONLY_OPERATION));
+            asyncOperation.setMessageReceiver(getCallbackReceiver(synCfg));
+            AxisMessage outOnlyMsg = new AxisMessage();
+            outOnlyMsg.setName("out-message");
+            outOnlyMsg.setParent(asyncOperation);
+            asyncOperation.addMessage(outMsg, WSDLConstants.MESSAGE_LABEL_OUT_VALUE);
+
             AxisService axisAnonymousService  = new AxisService(serviceKey);
             axisAnonymousService.addOperation(dynamicOperation);
+            axisAnonymousService.addOperation(asyncOperation);
             axisCfg.addService(axisAnonymousService);
             axisCfg.getPhasesInfo().setOperationPhases(dynamicOperation);
             return axisAnonymousService;

Modified: webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2FlexibleMEPClient.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2FlexibleMEPClient.java?rev=573187&r1=573186&r2=573187&view=diff
==============================================================================
--- webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2FlexibleMEPClient.java (original)
+++ webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2FlexibleMEPClient.java Thu Sep  6 02:01:47 2007
@@ -175,9 +175,14 @@
             axisCfgCtx, (AxisServiceGroup) anoymousService.getParent());
         ServiceContext serviceCtx = sgc.getServiceContext(anoymousService);
 
+        boolean outOnlyMessage = "true".equals(
+            synapseOutMessageContext.getProperty(Constants.OUT_ONLY));
+
         // get a reference to the DYNAMIC operation of the Anonymous Axis2 service
         AxisOperation axisAnonymousOperation = anoymousService.getOperation(
-            new QName(AnonymousServiceFactory.DYNAMIC_OPERATION));
+            outOnlyMessage ?
+                new QName(AnonymousServiceFactory.OUT_ONLY_OPERATION) :
+                new QName(AnonymousServiceFactory.OUT_IN_OPERATION));
 
         Options clientOptions = new Options();
         clientOptions.setUseSeparateListener(separateListener);
@@ -210,25 +215,27 @@
                 fac.createSOAPHeader(axisOutMsgCtx.getEnvelope());
             }
         }
-        OperationClient mepClient = axisAnonymousOperation.createClient(
-            serviceCtx, clientOptions);
+
+        OperationClient mepClient = axisAnonymousOperation.createClient(serviceCtx, clientOptions);
         mepClient.addMessageContext(axisOutMsgCtx);
         axisOutMsgCtx.setAxisMessage(
             axisAnonymousOperation.getMessage(WSDLConstants.MESSAGE_LABEL_OUT_VALUE));
 
-        // always set a callback as we decide if the send it blocking or non blocking within
-        // the MEP client. This does not cause an overhead, as we simply create a 'holder'
-        // object with a reference to the outgoing synapse message context synapseOutMessageContext
-        AsyncCallback callback = new AsyncCallback(synapseOutMessageContext);
-        if (endpoint != null) {
-            // set the timeout time and the timeout action to the callback, so that the TimeoutHandler
-            // can detect timed out callbacks and take approprite action.
-            callback.setTimeOutOn(System.currentTimeMillis() + endpoint.getTimeoutDuration());
-            callback.setTimeOutAction(endpoint.getTimeoutAction());
+        if (!outOnlyMessage) {
+            // always set a callback as we decide if the send it blocking or non blocking within
+            // the MEP client. This does not cause an overhead, as we simply create a 'holder'
+            // object with a reference to the outgoing synapse message context synapseOutMessageContext
+            AsyncCallback callback = new AsyncCallback(synapseOutMessageContext);
+            if (endpoint != null) {
+                // set the timeout time and the timeout action to the callback, so that the TimeoutHandler
+                // can detect timed out callbacks and take approprite action.
+                callback.setTimeOutOn(System.currentTimeMillis() + endpoint.getTimeoutDuration());
+                callback.setTimeOutAction(endpoint.getTimeoutAction());
+            }
+            mepClient.setCallback(callback);
         }
-        mepClient.setCallback(callback);
 
-        mepClient.execute(false);
+        mepClient.execute(true);
 
         // with the nio transport, this causes the listener not to write a 202
         // Accepted response, as this implies that Synapse does not yet know if

Modified: webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/base/AbstractTransportListener.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/base/AbstractTransportListener.java?rev=573187&r1=573186&r2=573187&view=diff
==============================================================================
--- webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/base/AbstractTransportListener.java (original)
+++ webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/base/AbstractTransportListener.java Thu Sep  6 02:01:47 2007
@@ -160,7 +160,9 @@
         msgCtx.setServerSide(true);
         msgCtx.setMessageID(UUIDGenerator.getUUID());
 
-        msgCtx.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, Boolean.valueOf(isNonBlocking));
+        // There is a discrepency in what I thought, Axis2 spawns a nes threads to
+        // send a message is this is TRUE - and I want it to be the other way
+        msgCtx.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, Boolean.valueOf(!isNonBlocking));
 
         // are these relevant?
         //msgCtx.setServiceGroupContextId(UUIDGenerator.getUUID());

Modified: webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSSender.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSSender.java?rev=573187&r1=573186&r2=573187&view=diff
==============================================================================
--- webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSSender.java (original)
+++ webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSSender.java Thu Sep  6 02:01:47 2007
@@ -16,6 +16,9 @@
 package org.apache.axis2.transport.jms;
 
 import org.apache.axiom.om.OMOutputFormat;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMText;
+import org.apache.axiom.om.OMNode;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.context.ConfigurationContext;
@@ -31,6 +34,7 @@
 import org.apache.commons.logging.LogFactory;
 
 import javax.jms.*;
+import javax.activation.DataHandler;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.*;
@@ -238,38 +242,72 @@
         Message message = null;
         String msgType = getProperty(msgContext, JMSConstants.JMS_MESSAGE_TYPE);
 
-        OMOutputFormat format = BaseUtils.getOMOutputFormat(msgContext);
-        MessageFormatter messageFormatter = null;
-        try {
-            messageFormatter = TransportUtils.getMessageFormatter(msgContext);
-        } catch (AxisFault axisFault) {
-            throw new JMSException("Unable to get the message formatter to use");
-        }
+        // check the first element of the SOAP body, do we have content wrapped using the
+        // default wrapper elements for binary (BaseConstants.DEFAULT_BINARY_WRAPPER) or
+        // text (BaseConstants.DEFAULT_TEXT_WRAPPER) ? If so, do not create SOAP messages
+        // for JMS but just get the payload in its native format
+        String jmsPayloadType = guessMessageType(msgContext);
+
+        if (jmsPayloadType == null) {
+
+            OMOutputFormat format = BaseUtils.getOMOutputFormat(msgContext);
+            MessageFormatter messageFormatter = null;
+            try {
+                messageFormatter = TransportUtils.getMessageFormatter(msgContext);
+            } catch (AxisFault axisFault) {
+                throw new JMSException("Unable to get the message formatter to use");
+            }
 
-        String contentType = messageFormatter.getContentType(
-            msgContext, format, msgContext.getSoapAction());
+            String contentType = messageFormatter.getContentType(
+                msgContext, format, msgContext.getSoapAction());
 
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        try {
-            messageFormatter.writeTo(msgContext, format, baos, true);
-            baos.flush();
-        } catch (IOException e) {
-            handleException("IO Error while creating BytesMessage", e);
-        }
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            try {
+                messageFormatter.writeTo(msgContext, format, baos, true);
+                baos.flush();
+            } catch (IOException e) {
+                handleException("IO Error while creating BytesMessage", e);
+            }
 
-        if (msgType != null && JMSConstants.JMS_BYTE_MESSAGE.equals(msgType) ||
-            contentType.indexOf(HTTPConstants.HEADER_ACCEPT_MULTIPART_RELATED) > -1) {
+            if (msgType != null && JMSConstants.JMS_BYTE_MESSAGE.equals(msgType) ||
+                contentType.indexOf(HTTPConstants.HEADER_ACCEPT_MULTIPART_RELATED) > -1) {
+                message = session.createBytesMessage();
+                BytesMessage bytesMsg = (BytesMessage) message;
+                bytesMsg.writeBytes(baos.toByteArray());
+            } else {
+                message = session.createTextMessage();  // default
+                TextMessage txtMsg = (TextMessage) message;
+                txtMsg.setText(new String(baos.toByteArray()));
+            }
+            message.setStringProperty(BaseConstants.CONTENT_TYPE, contentType);
+
+        } else if (JMSConstants.JMS_BYTE_MESSAGE.equals(jmsPayloadType)) {
             message = session.createBytesMessage();
             BytesMessage bytesMsg = (BytesMessage) message;
-            bytesMsg.writeBytes(baos.toByteArray());
-        } else {
-            message = session.createTextMessage();  // default
+            OMElement wrapper = msgContext.getEnvelope().getBody().
+                getFirstChildWithName(BaseConstants.DEFAULT_BINARY_WRAPPER);
+            OMNode omNode = wrapper.getFirstOMChild();
+            if (omNode != null && omNode instanceof OMText) {
+                Object dh = ((OMText) omNode).getDataHandler();
+                if (dh != null && dh instanceof DataHandler) {
+                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                    try {
+                        ((DataHandler) dh).writeTo(baos);
+                    } catch (IOException e) {
+                        handleException("Error serializing binary content of element : " +
+                            BaseConstants.DEFAULT_BINARY_WRAPPER, e);
+                    }
+                    bytesMsg.writeBytes(baos.toByteArray());
+                }
+            }
+
+        } else if (JMSConstants.JMS_TEXT_MESSAGE.equals(jmsPayloadType)) {
+            message = session.createTextMessage();
             TextMessage txtMsg = (TextMessage) message;
-            txtMsg.setText(new String(baos.toByteArray()));
+            txtMsg.setText(msgContext.getEnvelope().getBody().
+                getFirstChildWithName(BaseConstants.DEFAULT_TEXT_WRAPPER).getText());
         }
 
-        message.setStringProperty(BaseConstants.CONTENT_TYPE, contentType);
-
         // set the JMS correlation ID if specified
         String correlationId = getProperty(msgContext, JMSConstants.JMS_COORELATION_ID);
         if (correlationId == null && msgContext.getRelatesTo() != null) {
@@ -292,6 +330,23 @@
 
         JMSUtils.setTransportHeaders(msgContext, message);
         return message;
+    }
+
+    /**
+     * Guess the message type to use for JMS looking at the message contexts' envelope
+     * @param msgContext the message context
+     * @return JMSConstants.JMS_BYTE_MESSAGE or JMSConstants.JMS_TEXT_MESSAGE or null
+     */
+    private String guessMessageType(MessageContext msgContext) {
+        OMElement firstChild = msgContext.getEnvelope().getBody().getFirstElement();
+        if (firstChild != null) {
+            if (BaseConstants.DEFAULT_BINARY_WRAPPER.equals(firstChild.getQName())) {
+                return JMSConstants.JMS_BYTE_MESSAGE;
+            } else if (BaseConstants.DEFAULT_TEXT_WRAPPER.equals(firstChild.getQName())) {
+                return JMSConstants.JMS_TEXT_MESSAGE;
+            }
+        }
+        return null;
     }
 
     /**

Modified: webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/nhttp/ServerWorker.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/nhttp/ServerWorker.java?rev=573187&r1=573186&r2=573187&view=diff
==============================================================================
--- webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/nhttp/ServerWorker.java (original)
+++ webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/nhttp/ServerWorker.java Thu Sep  6 02:01:47 2007
@@ -117,7 +117,9 @@
     private MessageContext createMessageContext(HttpRequest request) {
 
         MessageContext msgContext = new MessageContext();
-        msgContext.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, Boolean.TRUE);
+        // There is a discrepency in what I thought, Axis2 spawns a nes threads to
+        // send a message is this is TRUE - and I want it to be the other way
+        msgContext.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, Boolean.FALSE);
         msgContext.setConfigurationContext(cfgCtx);
         if (isHttps) {
             msgContext.setTransportOut(cfgCtx.getAxisConfiguration()



---------------------------------------------------------------------
To unsubscribe, e-mail: synapse-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: synapse-dev-help@ws.apache.org