You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@synapse.apache.org by as...@apache.org on 2007/09/06 11:01:48 UTC
svn commit: r573187 - in /webservices/synapse/trunk/java/modules:
core/src/main/java/org/apache/synapse/
core/src/main/java/org/apache/synapse/core/axis2/
transports/src/main/java/org/apache/axis2/transport/base/
transports/src/main/java/org/apache/axi...
Author: asankha
Date: Thu Sep 6 02:01:47 2007
New Revision: 573187
URL: http://svn.apache.org/viewvc?rev=573187&view=rev
Log:
support messages to be marked as out-only using
<syn:property name="OUT_ONLY" value="true"/>
Also reverse the transportIsBlocking as Axis2 uses it in its reverse form
enhance JMS transport to send binary or text payloads without a soap envelope (where they were created out of non-soap messages)
Modified:
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/Constants.java
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/AnonymousServiceFactory.java
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2FlexibleMEPClient.java
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/base/AbstractTransportListener.java
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSSender.java
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/nhttp/ServerWorker.java
Modified: webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/Constants.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/Constants.java?rev=573187&r1=573186&r2=573187&view=diff
==============================================================================
--- webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/Constants.java (original)
+++ webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/Constants.java Thu Sep 6 02:01:47 2007
@@ -127,6 +127,8 @@
String RESPONSE = "RESPONSE";
+ String OUT_ONLY = "OUT_ONLY";
+
/** The tracing state -off */
int TRACING_OFF =0;
/** The tracing state-on */
Modified: webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/AnonymousServiceFactory.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/AnonymousServiceFactory.java?rev=573187&r1=573186&r2=573187&view=diff
==============================================================================
--- webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/AnonymousServiceFactory.java (original)
+++ webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/AnonymousServiceFactory.java Thu Sep 6 02:01:47 2007
@@ -21,9 +21,9 @@
import org.apache.axis2.AxisFault;
import org.apache.axis2.wsdl.WSDLConstants;
-import org.apache.axis2.util.CallbackReceiver;
import org.apache.axis2.description.AxisService;
import org.apache.axis2.description.AxisMessage;
+import org.apache.axis2.description.OutOnlyAxisOperation;
import org.apache.axis2.engine.AxisConfiguration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -47,7 +47,8 @@
private static final String SEC_AND_ADDR = "__SEC_AND_ADDR__";
private static final String RM_SEC_AND_ADDR = "__RM_SEC_AND_ADDR__";
- public static final String DYNAMIC_OPERATION = "__DYNAMIC_OPERATION__";
+ public static final String OUT_IN_OPERATION = "__OUT_IN_OPERATION__";
+ public static final String OUT_ONLY_OPERATION = "__OUT_ONLY_OPERATION__";
private static SynapseCallbackReceiver synapseCallbackReceiver = null;
@@ -130,7 +131,7 @@
try {
DynamicAxisOperation dynamicOperation =
- new DynamicAxisOperation(new QName(DYNAMIC_OPERATION));
+ new DynamicAxisOperation(new QName(OUT_IN_OPERATION));
dynamicOperation.setMessageReceiver(getCallbackReceiver(synCfg));
AxisMessage inMsg = new AxisMessage();
inMsg.setName("in-message");
@@ -140,8 +141,18 @@
outMsg.setParent(dynamicOperation);
dynamicOperation.addMessage(inMsg, WSDLConstants.MESSAGE_LABEL_OUT_VALUE);
dynamicOperation.addMessage(outMsg, WSDLConstants.MESSAGE_LABEL_IN_VALUE);
+
+ OutOnlyAxisOperation asyncOperation =
+ new OutOnlyAxisOperation(new QName(OUT_ONLY_OPERATION));
+ asyncOperation.setMessageReceiver(getCallbackReceiver(synCfg));
+ AxisMessage outOnlyMsg = new AxisMessage();
+ outOnlyMsg.setName("out-message");
+ outOnlyMsg.setParent(asyncOperation);
+ asyncOperation.addMessage(outMsg, WSDLConstants.MESSAGE_LABEL_OUT_VALUE);
+
AxisService axisAnonymousService = new AxisService(serviceKey);
axisAnonymousService.addOperation(dynamicOperation);
+ axisAnonymousService.addOperation(asyncOperation);
axisCfg.addService(axisAnonymousService);
axisCfg.getPhasesInfo().setOperationPhases(dynamicOperation);
return axisAnonymousService;
Modified: webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2FlexibleMEPClient.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2FlexibleMEPClient.java?rev=573187&r1=573186&r2=573187&view=diff
==============================================================================
--- webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2FlexibleMEPClient.java (original)
+++ webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2FlexibleMEPClient.java Thu Sep 6 02:01:47 2007
@@ -175,9 +175,14 @@
axisCfgCtx, (AxisServiceGroup) anoymousService.getParent());
ServiceContext serviceCtx = sgc.getServiceContext(anoymousService);
+ boolean outOnlyMessage = "true".equals(
+ synapseOutMessageContext.getProperty(Constants.OUT_ONLY));
+
// get a reference to the DYNAMIC operation of the Anonymous Axis2 service
AxisOperation axisAnonymousOperation = anoymousService.getOperation(
- new QName(AnonymousServiceFactory.DYNAMIC_OPERATION));
+ outOnlyMessage ?
+ new QName(AnonymousServiceFactory.OUT_ONLY_OPERATION) :
+ new QName(AnonymousServiceFactory.OUT_IN_OPERATION));
Options clientOptions = new Options();
clientOptions.setUseSeparateListener(separateListener);
@@ -210,25 +215,27 @@
fac.createSOAPHeader(axisOutMsgCtx.getEnvelope());
}
}
- OperationClient mepClient = axisAnonymousOperation.createClient(
- serviceCtx, clientOptions);
+
+ OperationClient mepClient = axisAnonymousOperation.createClient(serviceCtx, clientOptions);
mepClient.addMessageContext(axisOutMsgCtx);
axisOutMsgCtx.setAxisMessage(
axisAnonymousOperation.getMessage(WSDLConstants.MESSAGE_LABEL_OUT_VALUE));
- // always set a callback as we decide if the send it blocking or non blocking within
- // the MEP client. This does not cause an overhead, as we simply create a 'holder'
- // object with a reference to the outgoing synapse message context synapseOutMessageContext
- AsyncCallback callback = new AsyncCallback(synapseOutMessageContext);
- if (endpoint != null) {
- // set the timeout time and the timeout action to the callback, so that the TimeoutHandler
- // can detect timed out callbacks and take approprite action.
- callback.setTimeOutOn(System.currentTimeMillis() + endpoint.getTimeoutDuration());
- callback.setTimeOutAction(endpoint.getTimeoutAction());
+ if (!outOnlyMessage) {
+ // always set a callback as we decide if the send it blocking or non blocking within
+ // the MEP client. This does not cause an overhead, as we simply create a 'holder'
+ // object with a reference to the outgoing synapse message context synapseOutMessageContext
+ AsyncCallback callback = new AsyncCallback(synapseOutMessageContext);
+ if (endpoint != null) {
+ // set the timeout time and the timeout action to the callback, so that the TimeoutHandler
+ // can detect timed out callbacks and take approprite action.
+ callback.setTimeOutOn(System.currentTimeMillis() + endpoint.getTimeoutDuration());
+ callback.setTimeOutAction(endpoint.getTimeoutAction());
+ }
+ mepClient.setCallback(callback);
}
- mepClient.setCallback(callback);
- mepClient.execute(false);
+ mepClient.execute(true);
// with the nio transport, this causes the listener not to write a 202
// Accepted response, as this implies that Synapse does not yet know if
Modified: webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/base/AbstractTransportListener.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/base/AbstractTransportListener.java?rev=573187&r1=573186&r2=573187&view=diff
==============================================================================
--- webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/base/AbstractTransportListener.java (original)
+++ webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/base/AbstractTransportListener.java Thu Sep 6 02:01:47 2007
@@ -160,7 +160,9 @@
msgCtx.setServerSide(true);
msgCtx.setMessageID(UUIDGenerator.getUUID());
- msgCtx.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, Boolean.valueOf(isNonBlocking));
+ // There is a discrepency in what I thought, Axis2 spawns a nes threads to
+ // send a message is this is TRUE - and I want it to be the other way
+ msgCtx.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, Boolean.valueOf(!isNonBlocking));
// are these relevant?
//msgCtx.setServiceGroupContextId(UUIDGenerator.getUUID());
Modified: webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSSender.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSSender.java?rev=573187&r1=573186&r2=573187&view=diff
==============================================================================
--- webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSSender.java (original)
+++ webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSSender.java Thu Sep 6 02:01:47 2007
@@ -16,6 +16,9 @@
package org.apache.axis2.transport.jms;
import org.apache.axiom.om.OMOutputFormat;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMText;
+import org.apache.axiom.om.OMNode;
import org.apache.axis2.AxisFault;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.context.ConfigurationContext;
@@ -31,6 +34,7 @@
import org.apache.commons.logging.LogFactory;
import javax.jms.*;
+import javax.activation.DataHandler;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.*;
@@ -238,38 +242,72 @@
Message message = null;
String msgType = getProperty(msgContext, JMSConstants.JMS_MESSAGE_TYPE);
- OMOutputFormat format = BaseUtils.getOMOutputFormat(msgContext);
- MessageFormatter messageFormatter = null;
- try {
- messageFormatter = TransportUtils.getMessageFormatter(msgContext);
- } catch (AxisFault axisFault) {
- throw new JMSException("Unable to get the message formatter to use");
- }
+ // check the first element of the SOAP body, do we have content wrapped using the
+ // default wrapper elements for binary (BaseConstants.DEFAULT_BINARY_WRAPPER) or
+ // text (BaseConstants.DEFAULT_TEXT_WRAPPER) ? If so, do not create SOAP messages
+ // for JMS but just get the payload in its native format
+ String jmsPayloadType = guessMessageType(msgContext);
+
+ if (jmsPayloadType == null) {
+
+ OMOutputFormat format = BaseUtils.getOMOutputFormat(msgContext);
+ MessageFormatter messageFormatter = null;
+ try {
+ messageFormatter = TransportUtils.getMessageFormatter(msgContext);
+ } catch (AxisFault axisFault) {
+ throw new JMSException("Unable to get the message formatter to use");
+ }
- String contentType = messageFormatter.getContentType(
- msgContext, format, msgContext.getSoapAction());
+ String contentType = messageFormatter.getContentType(
+ msgContext, format, msgContext.getSoapAction());
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- try {
- messageFormatter.writeTo(msgContext, format, baos, true);
- baos.flush();
- } catch (IOException e) {
- handleException("IO Error while creating BytesMessage", e);
- }
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try {
+ messageFormatter.writeTo(msgContext, format, baos, true);
+ baos.flush();
+ } catch (IOException e) {
+ handleException("IO Error while creating BytesMessage", e);
+ }
- if (msgType != null && JMSConstants.JMS_BYTE_MESSAGE.equals(msgType) ||
- contentType.indexOf(HTTPConstants.HEADER_ACCEPT_MULTIPART_RELATED) > -1) {
+ if (msgType != null && JMSConstants.JMS_BYTE_MESSAGE.equals(msgType) ||
+ contentType.indexOf(HTTPConstants.HEADER_ACCEPT_MULTIPART_RELATED) > -1) {
+ message = session.createBytesMessage();
+ BytesMessage bytesMsg = (BytesMessage) message;
+ bytesMsg.writeBytes(baos.toByteArray());
+ } else {
+ message = session.createTextMessage(); // default
+ TextMessage txtMsg = (TextMessage) message;
+ txtMsg.setText(new String(baos.toByteArray()));
+ }
+ message.setStringProperty(BaseConstants.CONTENT_TYPE, contentType);
+
+ } else if (JMSConstants.JMS_BYTE_MESSAGE.equals(jmsPayloadType)) {
message = session.createBytesMessage();
BytesMessage bytesMsg = (BytesMessage) message;
- bytesMsg.writeBytes(baos.toByteArray());
- } else {
- message = session.createTextMessage(); // default
+ OMElement wrapper = msgContext.getEnvelope().getBody().
+ getFirstChildWithName(BaseConstants.DEFAULT_BINARY_WRAPPER);
+ OMNode omNode = wrapper.getFirstOMChild();
+ if (omNode != null && omNode instanceof OMText) {
+ Object dh = ((OMText) omNode).getDataHandler();
+ if (dh != null && dh instanceof DataHandler) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try {
+ ((DataHandler) dh).writeTo(baos);
+ } catch (IOException e) {
+ handleException("Error serializing binary content of element : " +
+ BaseConstants.DEFAULT_BINARY_WRAPPER, e);
+ }
+ bytesMsg.writeBytes(baos.toByteArray());
+ }
+ }
+
+ } else if (JMSConstants.JMS_TEXT_MESSAGE.equals(jmsPayloadType)) {
+ message = session.createTextMessage();
TextMessage txtMsg = (TextMessage) message;
- txtMsg.setText(new String(baos.toByteArray()));
+ txtMsg.setText(msgContext.getEnvelope().getBody().
+ getFirstChildWithName(BaseConstants.DEFAULT_TEXT_WRAPPER).getText());
}
- message.setStringProperty(BaseConstants.CONTENT_TYPE, contentType);
-
// set the JMS correlation ID if specified
String correlationId = getProperty(msgContext, JMSConstants.JMS_COORELATION_ID);
if (correlationId == null && msgContext.getRelatesTo() != null) {
@@ -292,6 +330,23 @@
JMSUtils.setTransportHeaders(msgContext, message);
return message;
+ }
+
+ /**
+ * Guess the message type to use for JMS looking at the message contexts' envelope
+ * @param msgContext the message context
+ * @return JMSConstants.JMS_BYTE_MESSAGE or JMSConstants.JMS_TEXT_MESSAGE or null
+ */
+ private String guessMessageType(MessageContext msgContext) {
+ OMElement firstChild = msgContext.getEnvelope().getBody().getFirstElement();
+ if (firstChild != null) {
+ if (BaseConstants.DEFAULT_BINARY_WRAPPER.equals(firstChild.getQName())) {
+ return JMSConstants.JMS_BYTE_MESSAGE;
+ } else if (BaseConstants.DEFAULT_TEXT_WRAPPER.equals(firstChild.getQName())) {
+ return JMSConstants.JMS_TEXT_MESSAGE;
+ }
+ }
+ return null;
}
/**
Modified: webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/nhttp/ServerWorker.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/nhttp/ServerWorker.java?rev=573187&r1=573186&r2=573187&view=diff
==============================================================================
--- webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/nhttp/ServerWorker.java (original)
+++ webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/nhttp/ServerWorker.java Thu Sep 6 02:01:47 2007
@@ -117,7 +117,9 @@
private MessageContext createMessageContext(HttpRequest request) {
MessageContext msgContext = new MessageContext();
- msgContext.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, Boolean.TRUE);
+ // There is a discrepency in what I thought, Axis2 spawns a nes threads to
+ // send a message is this is TRUE - and I want it to be the other way
+ msgContext.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, Boolean.FALSE);
msgContext.setConfigurationContext(cfgCtx);
if (isHttps) {
msgContext.setTransportOut(cfgCtx.getAxisConfiguration()
---------------------------------------------------------------------
To unsubscribe, e-mail: synapse-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: synapse-dev-help@ws.apache.org