You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@synapse.apache.org by ra...@apache.org on 2008/01/05 19:35:47 UTC
svn commit: r609189 - in /webservices/synapse/trunk/java: ./
modules/transports/src/main/java/org/apache/synapse/transport/amqp/
Author: rajith
Date: Sat Jan 5 10:35:46 2008
New Revision: 609189
URL: http://svn.apache.org/viewvc?rev=609189&view=rev
Log:
Initial code drop for a native AMQP transport based on the Apache Qpid project.
JIRA no is https://issues.apache.org/jira/browse/SYNAPSE-223
There is still outstanding work, most notably the TransportListener and test cases needs to be written.
Since Apache Qpid hasn't released an 0-10 client yet, I have uploaded Qpid jars to my private repo hosted in my apache home dir.
I tested and the dependencies resolve properly for me. However if you find deps are not resolving, please feel free to rollback the changes
Added:
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPConstants.java
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPListener.java
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPOutTransportInfo.java
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPSender.java
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPSynapseException.java
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPUtils.java
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/MessageManager.java
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/URIParser.java
Modified:
webservices/synapse/trunk/java/pom.xml
Added: webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPConstants.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPConstants.java?rev=609189&view=auto
==============================================================================
--- webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPConstants.java (added)
+++ webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPConstants.java Sat Jan 5 10:35:46 2008
@@ -0,0 +1,109 @@
+package org.apache.synapse.transport.amqp;
+
+import org.apache.axis2.client.Options;
+
+public class AMQPConstants
+{
+
+ /**
+ * The prefix indicating an Axis JMS URL
+ */
+ public static final String AMQP_PREFIX = "amqp:";
+
+ //------------------------------------ defaults ------------------------------------
+ /**
+ * The local (Axis2) AMQP connection name of the default connection
+ * to be used.
+ */
+ public static final String DEFAULT_CONNECTION = "default";
+ /**
+ * The default AMQP time out waiting for a reply
+ */
+ public static final long DEFAULT_AMQP_TIMEOUT = Options.DEFAULT_TIMEOUT_MILLISECONDS;
+
+ //-------------------------- services.xml parameters --------------------------------
+ /**
+ * The Parameter name indicating the amqp destination for requests
+ */
+ public static final String CONNECTION_URL_PARAM = "transport.amqp.ConnectionURL";
+
+ public static final String EXCHANGE_NAME_PARAM = "transport.amqp.ExchangeName";
+
+ public static final String EXCHANGE_TYPE_PARAM = "transport.amqp.ExchangeType";
+
+ public static final String ROUTING_KEY_PARAM = "transport.amqp.RoutingKey";
+
+ /**
+ * The Parameter name indicating the response AMQP destination
+ */
+ public static final String REPLY_EXCHANGE_TYPE_PARAM = "transport.amqp.ReplyExchangeType";
+ public static final String REPLY_EXCHANGE_NAME_PARAM = "transport.amqp.ReplyExchangeName";
+ /**
+ * The Parameter name indicating the response AMQP destination class.Ex direct,topic,fannot ..etc
+ */
+ public static final String REPLY_ROUTING_KEY_PARAM = "transport.amqp.ReplyRoutingKey";
+
+ /**
+ * The Parameter name of an Axis2 service, indicating the AMQP connection
+ * which should be used to listen for messages for it.
+ */
+ public static final String CONNECTION_PARAM = "transport.amqp.Connection";
+ /**
+ * If reconnect timeout if connection error occurs in seconds
+ */
+ public static final String RECONNECT_TIMEOUT = "transport.amqp.ReconnectTimeout";
+
+ //------------ message context / transport header properties and client options ------------
+ /**
+ * A MessageContext property or client Option stating the time to wait for a response JMS message
+ */
+ public static final String AMQP_WAIT_REPLY = "AMQP_WAIT_REPLY";
+ /**
+ * A MessageContext property or client Option stating the AMQP correlation id
+ */
+ public static final String AMQP_CORELATION_ID = "AMQP_CORELATION_ID";
+ /**
+ * A MessageContext property or client Option stating the AMQP message id
+ */
+ public static final String AMQP_MESSAGE_ID = "AMQP_MESSAGE_ID";
+ /**
+ * A MessageContext property or client Option stating the AMQP delivery mode
+ */
+ public static final String AMQP_DELIVERY_MODE = "AMQP_DELIVERY_MODE";
+ /**
+ * A MessageContext property or client Option stating the AMQP destination
+ */
+ public static final String AMQP_EXCHANGE_NAME = "AMQP_EXCHANGE_NAME";
+
+ public static final String AMQP_EXCHANGE_TYPE = "AMQP_EXCHANGE_TYPE";
+
+ public static final String AMQP_ROUTING_KEY = "AMQP_ROUTING_KEY";
+ /**
+ * A MessageContext property or client Option stating the AMQP expiration
+ */
+ public static final String AMQP_EXPIRATION = "AMQP_EXPIRATION";
+ /**
+ * A MessageContext property or client Option stating the AMQP priority
+ */
+ public static final String AMQP_PRIORITY = "AMQP_PRIORITY";
+ /**
+ * A MessageContext property stating if the message is a redelivery
+ */
+ public static final String AMQP_REDELIVERED = "AMQP_REDELIVERED";
+ /**
+ * A MessageContext property or client Option stating the AMQP replyTo
+ */
+ public static final String AMQP_REPLY_TO_EXCHANGE_NAME = "AMQP_REPLY_TO_EXCHANGE_NAME";
+
+ public static final String AMQP_REPLY_TO_EXCHANGE_TYPE = "AMQP_REPLY_TO_EXCHANGE_TYPE";
+
+ public static final String AMQP_REPLY_TO_ROUTING_KEY = "AMQP_REPLY_TO_ROUTING_KEY";
+ /**
+ * A MessageContext property or client Option stating the AMQP timestamp
+ */
+ public static final String AMQP_TIMESTAMP = "AMQP_TIMESTAMP";
+ /**
+ * A MessageContext property or client Option stating the AMQP type
+ */
+ public static final String AMQP_CONTENT_TYPE = "AMQP_CONTENT_TYPE";
+}
Added: webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPListener.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPListener.java?rev=609189&view=auto
==============================================================================
--- webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPListener.java (added)
+++ webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPListener.java Sat Jan 5 10:35:46 2008
@@ -0,0 +1,35 @@
+package org.apache.synapse.transport.amqp;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.description.AxisService;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.transport.base.AbstractTransportListener;
+
+public class AMQPListener extends AbstractTransportListener
+{
+ public static final String TRANSPORT_NAME = "jms";
+ private static final Log log = LogFactory.getLog(AMQPListener.class);
+
+ @Override
+ protected void startListeningForService(AxisService service)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected void stopListeningForService(AxisService service)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public EndpointReference[] getEPRsForService(String arg0, String arg1) throws AxisFault
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
Added: webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPOutTransportInfo.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPOutTransportInfo.java?rev=609189&view=auto
==============================================================================
--- webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPOutTransportInfo.java (added)
+++ webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPOutTransportInfo.java Sat Jan 5 10:35:46 2008
@@ -0,0 +1,57 @@
+package org.apache.synapse.transport.amqp;
+
+import java.util.Map;
+
+import org.apache.axis2.transport.OutTransportInfo;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class AMQPOutTransportInfo implements OutTransportInfo
+{
+
+ private static final Log log = LogFactory.getLog(OutTransportInfo.class);
+ private String address = null;
+ private String contentType = null;
+ private String conURL = null;
+ private String exchangeName = null;
+ private String routingKey = null;
+
+ public AMQPOutTransportInfo(String address)
+ {
+ this.address = address;
+ if (!address.startsWith(AMQPConstants.AMQP_PREFIX)) {
+ handleException("Invalid prefix for a AMQP EPR : " + address);
+ } else {
+ Map props = URIParser.parse(address);
+ conURL = (String)props.get(AMQPConstants.CONNECTION_URL_PARAM);
+ routingKey = (String)props.get(AMQPConstants.ROUTING_KEY_PARAM);
+ exchangeName = (String)props.get(AMQPConstants.EXCHANGE_NAME_PARAM);
+ }
+ }
+
+ public String getConnectionURL(){
+ return conURL;
+ }
+
+ public String getExchangeName(){
+ return exchangeName;
+ }
+
+ public String getRoutingKey(){
+ return routingKey;
+ }
+
+ public void setContentType(String contentType){
+ this.contentType = contentType;
+ }
+
+ private void handleException(String s, Exception e) {
+ log.error(s, e);
+ throw new AMQPSynapseException(s,e);
+ }
+
+ private void handleException(String s) {
+ log.error(s);
+ throw new AMQPSynapseException(s);
+ }
+}
Added: webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPSender.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPSender.java?rev=609189&view=auto
==============================================================================
--- webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPSender.java (added)
+++ webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPSender.java Sat Jan 5 10:35:46 2008
@@ -0,0 +1,327 @@
+package org.apache.synapse.transport.amqp;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.axiom.om.OMOutputFormat;
+import org.apache.axiom.om.util.UUIDGenerator;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.description.TransportOutDescription;
+import org.apache.axis2.transport.MessageFormatter;
+import org.apache.axis2.transport.OutTransportInfo;
+import org.apache.axis2.transport.TransportUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.qpidity.api.Message;
+import org.apache.qpidity.nclient.Client;
+import org.apache.qpidity.nclient.Connection;
+import org.apache.qpidity.nclient.Session;
+import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
+import org.apache.qpidity.transport.DeliveryProperties;
+import org.apache.qpidity.transport.MessageProperties;
+import org.apache.qpidity.transport.Option;
+import org.apache.qpidity.transport.ReplyTo;
+import org.apache.synapse.transport.base.AbstractTransportSender;
+import org.apache.synapse.transport.base.BaseConstants;
+import org.apache.synapse.transport.base.BaseUtils;
+import org.apache.synapse.transport.jms.JMSUtils;
+
+public class AMQPSender extends AbstractTransportSender {
+
+ public static final String TRANSPORT_NAME = "amqp";
+ private static final Log log = LogFactory.getLog(AMQPSender.class);
+ private Map<String,ConnectionDetails> _connectionDetails = new HashMap<String,ConnectionDetails>();
+
+ @Override
+ public void init(ConfigurationContext cfgCtx, TransportOutDescription transportOut) throws AxisFault {
+ setTransportName(TRANSPORT_NAME);
+ super.init(cfgCtx, transportOut);
+ // init connections
+ }
+
+ /**
+ * Needs a more robust strategy to cache connections and sessions
+ * For efficiency I assume that the reply to exchange,queue and the binding already exists.
+ *
+ * For synchrouns request/reponse a temp queue will be create and bound to the direct exchange.
+ */
+ @Override
+ public void sendMessage(MessageContext msgCtx, String targetEPR, OutTransportInfo outTransportInfo) throws AxisFault{
+
+ AMQPOutTransportInfo amqpTransportInfo = null;
+ ConnectionDetails conDetails = null;
+ Session session = null;
+
+ // If targetEPR is not null, determine the addressing info from it
+ if (targetEPR != null) {
+ amqpTransportInfo = new AMQPOutTransportInfo(targetEPR);
+ // do we have a definition for a connection factory to use for this address?
+
+ }
+ // If not try to get the addressing info from the transport description
+ else if (outTransportInfo != null && outTransportInfo instanceof AMQPOutTransportInfo) {
+ amqpTransportInfo = (AMQPOutTransportInfo) outTransportInfo;
+ }
+
+ if (_connectionDetails.containsKey(amqpTransportInfo.getConnectionURL())){
+ conDetails = _connectionDetails.get(amqpTransportInfo.getConnectionURL());
+ }else{
+ // else create a new connection
+ Connection con = Client.createConnection();
+ try{
+ con.connect(amqpTransportInfo.getConnectionURL());
+ }catch(Exception e){
+ throw new AMQPSynapseException("Error creating a connection to the broker",e);
+ }
+ _connectionDetails.put(amqpTransportInfo.getConnectionURL(), new ConnectionDetails(con));
+ }
+
+ if (conDetails != null) {
+ session = conDetails.getSession();
+ }
+
+ byte[] message = null;
+ String correlationId = null;
+ try {
+ message = createMessageData(msgCtx);
+ } catch (AMQPSynapseException e) {
+ handleException("Error creating a message from the axis message context", e);
+ }
+
+ // should we wait for a synchronous response on this same thread?
+ boolean waitForResponse = waitForSynchronousResponse(msgCtx);
+
+ DeliveryProperties deliveryProps = new DeliveryProperties();
+ MessageProperties msgProps = new MessageProperties();
+ fillMessageHeaders(msgCtx,amqpTransportInfo,session,waitForResponse,deliveryProps,msgProps);
+
+ synchronized(session){
+ session.header(msgProps,deliveryProps);
+ session.data(message);
+ session.endData();
+ }
+
+ // if we are expecting a synchronous response back for the message sent out
+ if (waitForResponse) {
+ waitForResponseAndProcess(session, msgProps, msgCtx);
+ }
+ }
+
+ private void fillMessageHeaders(MessageContext msgCtx, AMQPOutTransportInfo amqpTransportInfo,
+ Session session, boolean waitForResponse,
+ DeliveryProperties deliveryProps, MessageProperties msgProps){
+ // Routing info
+ deliveryProps.setExchange(amqpTransportInfo.getExchangeName());
+ deliveryProps.setRoutingKey(amqpTransportInfo.getRoutingKey());
+
+ /* For efficiency I assume that the reply to exchange and destination is already created
+ * If the reply is for the same service, then this should be the queue that the service is listening to
+ * Blindly creating these exchanges,queues and bindings is sub optimal and can be avoid if the administrator
+ * creates the nessacery exchanges,queues and bindings before hand.
+ *
+ * If the service hasn't specify and it's a request/reply MEP then a temporary queue
+ * (which is auto-deleted) is created and bound to the amq.direct exchange.
+ */
+ if (msgCtx.getProperty(AMQPConstants.AMQP_REPLY_TO_EXCHANGE_NAME) != null){
+ String replyExchangeName = (String) msgCtx.getProperty(AMQPConstants.AMQP_REPLY_TO_EXCHANGE_NAME);
+ String replyRoutingKey = msgCtx.getProperty(AMQPConstants.AMQP_REPLY_TO_ROUTING_KEY)!= null?(String) msgCtx.getProperty(AMQPConstants.AMQP_REPLY_TO_ROUTING_KEY):null;
+
+ // for fannout exchange or some other custom exchange, the routing key maybe null
+ msgProps.setReplyTo(new ReplyTo(replyExchangeName,replyRoutingKey));
+ }
+
+ // Content type
+ OMOutputFormat format = BaseUtils.getOMOutputFormat(msgCtx);
+ MessageFormatter messageFormatter = null;
+ try {
+ messageFormatter = TransportUtils.getMessageFormatter(msgCtx);
+ } catch (AxisFault axisFault) {
+ throw new AMQPSynapseException("Unable to get the message formatter to use");
+ }
+
+ String contentType = messageFormatter.getContentType(
+ msgCtx, format, msgCtx.getSoapAction());
+ msgProps.setContentType(contentType);
+
+ // Custom properties - SOAP ACTION
+ Map<String,Object> props = new HashMap();
+
+ if (msgCtx.isServerSide()) {
+ // set SOAP Action as a property on the JMS message
+ props.put(BaseConstants.SOAPACTION,(String)msgCtx.getProperty(BaseConstants.SOAPACTION));
+
+ } else {
+ String action = msgCtx.getOptions().getAction();
+ if (action != null) {
+ props.put(BaseConstants.SOAPACTION, action);
+ }
+ }
+
+ msgProps.setApplicationHeaders(props);
+
+ // transport headers
+ Map headerMap = (Map) msgCtx.getProperty(MessageContext.TRANSPORT_HEADERS);
+
+ if (headerMap != null){
+ Iterator iter = headerMap.keySet().iterator();
+ while (iter.hasNext()) {
+
+ String name = (String) iter.next();
+
+ if (AMQPConstants.AMQP_CORELATION_ID.equals(name)) {
+ msgProps.setCorrelationId((String) headerMap.get(AMQPConstants.AMQP_CORELATION_ID));
+ }
+ else if (AMQPConstants.AMQP_DELIVERY_MODE.equals(name)) {
+ Object o = headerMap.get(AMQPConstants.AMQP_DELIVERY_MODE);
+ if (o instanceof Short) {
+ deliveryProps.setDeliveryMode(((Short) o).shortValue());
+ }else if (o instanceof Integer) {
+ deliveryProps.setDeliveryMode(((Integer) o).shortValue());
+ }else if (o instanceof String) {
+ try {
+ deliveryProps.setDeliveryMode(Short.parseShort((String) o));
+ } catch (NumberFormatException nfe) {
+ log.warn("Invalid delivery mode ignored : " + o, nfe);
+ }
+ } else {
+ log.warn("Invalid delivery mode ignored : " + o);
+ }
+ }
+ else if (AMQPConstants.AMQP_EXPIRATION.equals(name)) {
+ deliveryProps.setExpiration(
+ Long.parseLong((String) headerMap.get(AMQPConstants.AMQP_EXPIRATION)));
+ }
+ else if (AMQPConstants.AMQP_MESSAGE_ID.equals(name)) {
+ msgProps.setMessageId((String) headerMap.get(AMQPConstants.AMQP_MESSAGE_ID));
+ }
+ else if (AMQPConstants.AMQP_PRIORITY.equals(name)) {
+ deliveryProps.setPriority(
+ Short.parseShort((String) headerMap.get(AMQPConstants.AMQP_PRIORITY)));
+ }
+ else if (AMQPConstants.AMQP_TIMESTAMP.equals(name)) {
+ deliveryProps.setTimestamp(
+ Long.parseLong((String) headerMap.get(AMQPConstants.AMQP_TIMESTAMP)));
+ }else {
+ // custom app props
+ Object value = headerMap.get(name);
+ props.put(name, value);
+ }
+ }
+ }
+ // If it's request/response, then we need to fill in corelation id and reply to properties
+ if (waitForResponse && msgCtx.getProperty(AMQPConstants.AMQP_CORELATION_ID) == null) {
+ msgProps.setCorrelationId(UUIDGenerator.getUUID());
+ if (msgProps.getReplyTo() == null){
+ // We need to use a temp queue here.
+ String tempQueueName = "Queue_" + msgProps.getCorrelationId();
+ synchronized(session){
+ session.queueDeclare(tempQueueName, null, null, Option.AUTO_DELETE,Option.EXCLUSIVE);
+ session.queueBind(tempQueueName, "amq.direct", tempQueueName, null);
+ session.sync();
+ }
+ msgProps.replyTo(new ReplyTo("amq.direct",tempQueueName));
+ }
+ }
+ }
+
+ private byte[] createMessageData(MessageContext msgContext){
+ OMOutputFormat format = BaseUtils.getOMOutputFormat(msgContext);
+ MessageFormatter messageFormatter = null;
+ try {
+ messageFormatter = TransportUtils.getMessageFormatter(msgContext);
+ } catch (AxisFault axisFault) {
+ throw new AMQPSynapseException("Unable to get the message formatter to use",axisFault);
+ }
+
+ String contentType = messageFormatter.getContentType(
+ msgContext, format, msgContext.getSoapAction());
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try {
+ messageFormatter.writeTo(msgContext, format, baos, true);
+ baos.flush();
+ return baos.toByteArray();
+ } catch (IOException e) {
+ throw new AMQPSynapseException("IO Error while creating message", e);
+ }
+ }
+
+ private void waitForResponseAndProcess(Session session, MessageProperties msgProps,
+ MessageContext msgCtx) throws AxisFault {
+
+ long timeout = AMQPConstants.DEFAULT_AMQP_TIMEOUT;
+ String waitReply = (String) msgCtx.getProperty(AMQPConstants.AMQP_WAIT_REPLY);
+ if (waitReply != null) {
+ timeout = Long.valueOf(waitReply).longValue();
+ }
+ // We are using the routing key (which is the queue name) as the destination
+ String destination = msgProps.getReplyTo().getRoutingKey();
+ MessageManager listener = new MessageManager(session,destination,msgProps.getCorrelationId());
+ session.messageSubscribe(msgProps.getReplyTo().getRoutingKey(),
+ destination,
+ Session.TRANSFER_CONFIRM_MODE_REQUIRED,
+ Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE,
+ new MessagePartListenerAdapter(listener), null, Option.NO_OPTION);
+
+ Message reply = listener.receive(timeout);
+
+ if (reply != null) {
+ processSyncResponse(msgCtx, reply);
+
+ } else {
+ log.warn("Did not receive a response within " +
+ timeout + " ms to destination : " + msgProps.getReplyTo().getRoutingKey() +
+ " with correlation ID : " + msgProps.getCorrelationId());
+ }
+ }
+
+ private void processSyncResponse(MessageContext outMsgCtx, Message message) throws AxisFault {
+
+ MessageContext responseMsgCtx = createResponseMessageContext(outMsgCtx);
+
+ // load any transport headers from received message
+ Map map = AMQPUtils.getTransportHeaders(message);
+ responseMsgCtx.setProperty(MessageContext.TRANSPORT_HEADERS, map);
+
+ // workaround for Axis2 TransportUtils.createSOAPMessage() issue, where a response
+ // of content type "text/xml" is thought to be REST if !MC.isServerSide(). This
+ // question is still under debate and due to the timelines, I am commiting this
+ // workaround as Axis2 1.2 is about to be released and Synapse 1.0
+ responseMsgCtx.setServerSide(false);
+
+ String contentType = JMSUtils.getInstace().getProperty(message, BaseConstants.CONTENT_TYPE);
+
+ AMQPUtils.getInstace().setSOAPEnvelope(message, responseMsgCtx, contentType);
+ responseMsgCtx.setServerSide(true);
+
+ handleIncomingMessage(
+ responseMsgCtx,
+ map,
+ (String)map.get(BaseConstants.SOAPACTION),
+ contentType
+ );
+ }
+
+ private class ConnectionDetails{
+ private Connection _conn;
+ //private Map _sessions = new HashMap();
+ private Session _session;
+
+ public ConnectionDetails(Connection conn){
+ _conn = conn;
+ }
+
+ public Session getSession(){
+ if (_session != null){
+ _session = _conn.createSession(0);
+ }
+ return _session;
+ }
+ }
+
+}
\ No newline at end of file
Added: webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPSynapseException.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPSynapseException.java?rev=609189&view=auto
==============================================================================
--- webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPSynapseException.java (added)
+++ webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPSynapseException.java Sat Jan 5 10:35:46 2008
@@ -0,0 +1,12 @@
+package org.apache.synapse.transport.amqp;
+
+public class AMQPSynapseException extends RuntimeException
+{
+ public AMQPSynapseException(String s, Exception e){
+ super(s,e);
+ }
+
+ public AMQPSynapseException(String s){
+ super(s);
+ }
+}
Added: webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPUtils.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPUtils.java?rev=609189&view=auto
==============================================================================
--- webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPUtils.java (added)
+++ webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPUtils.java Sat Jan 5 10:35:46 2008
@@ -0,0 +1,134 @@
+package org.apache.synapse.transport.amqp;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.Topic;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.qpidity.api.Message;
+import org.apache.synapse.transport.base.BaseUtils;
+import org.apache.synapse.transport.jms.JMSConstants;
+
+public class AMQPUtils extends BaseUtils
+{
+
+ private static final Log log = LogFactory.getLog(AMQPUtils.class);
+
+ private static BaseUtils _instance = new AMQPUtils();
+
+ public static BaseUtils getInstace() {
+ return _instance;
+ }
+
+ @Override
+ public InputStream getInputStream(Object message)
+ {
+ Message msg = (Message)message;
+ try{
+ final ByteBuffer buf = msg.readData();
+ return new InputStream() {
+ public synchronized int read() throws IOException {
+ if (!buf.hasRemaining()) {
+ return -1;
+ }
+ return buf.get();
+ }
+
+ public synchronized int read(byte[] bytes, int off, int len) throws IOException {
+ // Read only what's left
+ len = Math.min(len, buf.remaining());
+ buf.get(bytes, off, len);
+ return len;
+ }
+ };
+ }catch(IOException e){
+ throw new AMQPSynapseException("Error reading payload",e);
+ }
+ }
+
+ @Override
+ public byte[] getMessageBinaryPayload(Object message)
+ {
+ return null;
+ }
+
+ @Override
+ public String getMessageTextPayload(Object message)
+ {
+ return null;
+ }
+
+ @Override
+ public String getProperty(Object message, String property)
+ {
+ try {
+ return (String)((Message)message).getMessageProperties().getApplicationHeaders().get(property);
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
+ /**
+ * Extract transport level headers for JMS from the given message into a Map
+ *
+ * @param message the JMS message
+ * @return a Map of the transport headers
+ */
+ public static Map getTransportHeaders(Message message) {
+ // create a Map to hold transport headers
+ Map map = new HashMap();
+
+ // correlation ID
+ if (message.getMessageProperties().getCorrelationId() != null) {
+ map.put(AMQPConstants.AMQP_CORELATION_ID, message.getMessageProperties().getCorrelationId());
+ }
+
+ // set the delivery mode as persistent or not
+ try {
+ map.put(AMQPConstants.AMQP_DELIVERY_MODE,message.getDeliveryProperties().getDeliveryMode());
+ } catch (Exception ignore) {}
+
+ // destination name
+ map.put(AMQPConstants.AMQP_EXCHANGE_NAME,message.getDeliveryProperties().getExchange());
+ map.put(AMQPConstants.AMQP_ROUTING_KEY,message.getDeliveryProperties().getRoutingKey());
+
+ // expiration
+ try {
+ map.put(AMQPConstants.AMQP_EXPIRATION, message.getDeliveryProperties().getExpiration());
+ } catch (Exception ignore) {}
+
+ // if a JMS message ID is found
+ if (message.getMessageProperties().getMessageId() != null) {
+ map.put(AMQPConstants.AMQP_MESSAGE_ID, message.getMessageProperties().getMessageId());
+ }
+
+ // priority
+ map.put(AMQPConstants.AMQP_PRIORITY,message.getDeliveryProperties().getPriority());
+
+ // redelivered
+ map.put(AMQPConstants.AMQP_REDELIVERED, message.getDeliveryProperties().getRedelivered());
+
+ // replyto destination name
+ if (message.getMessageProperties().getReplyTo() != null) {
+ map.put(AMQPConstants.AMQP_REPLY_TO_EXCHANGE_NAME, message.getMessageProperties().getReplyTo().getExchangeName());
+ map.put(AMQPConstants.AMQP_REPLY_TO_ROUTING_KEY, message.getMessageProperties().getReplyTo().getRoutingKey());
+ }
+
+ // priority
+ map.put(AMQPConstants.AMQP_TIMESTAMP,message.getDeliveryProperties().getTimestamp());
+
+ // any other transport properties / headers
+ map.putAll(message.getMessageProperties().getApplicationHeaders());
+
+ return map;
+ }
+}
Added: webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/MessageManager.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/MessageManager.java?rev=609189&view=auto
==============================================================================
--- webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/MessageManager.java (added)
+++ webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/MessageManager.java Sat Jan 5 10:35:46 2008
@@ -0,0 +1,72 @@
+package org.apache.synapse.transport.amqp;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.qpidity.api.Message;
+import org.apache.qpidity.nclient.Session;
+import org.apache.qpidity.nclient.util.MessageListener;
+import org.apache.qpidity.transport.RangeSet;
+
+public class MessageManager implements MessageListener
+{
+ private static final Log log = LogFactory.getLog(AMQPSender.class);
+ private ArrayBlockingQueue<Message> queue = new ArrayBlockingQueue<Message>(1,true);
+ private Session session;
+ private String destination;
+ private String corelationId;
+
+ public MessageManager(Session session, String destination,String corelationId){
+ this.session = session;
+ this.destination = destination;
+ }
+
+ /*
+ * when this mehtod is called, it is assumed that we have exclusive access
+ * to the session.
+ */
+ public Message receive(long timeout){
+ Message m;
+ session.messageFlow(destination,Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
+ session.messageFlow(destination,Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
+ try{
+ m = queue.poll(timeout, TimeUnit.MILLISECONDS);
+ }catch(Exception e){
+ throw new AMQPSynapseException("unable to receive message",e);
+ }
+
+ if (m == null)
+ {
+ log.debug("Message Didn't arrive in time, checking if one is inflight");
+ // checking if one is inflight
+ session.messageFlush(destination);
+ session.sync();
+ try{
+ m = queue.take();
+ }catch(Exception e){
+ throw new AMQPSynapseException("unable to receive message",e);
+ }
+ }
+
+ return m;
+ }
+
+ public void onMessage(Message m)
+ {
+ System.out.println("\n================== Received Msg ==================");
+ System.out.println("Message Id : " + m.getMessageProperties().getMessageId());
+ System.out.println(m.toString());
+ System.out.println("================== End Msg ==================\n");
+
+ //AMQP currently doesn't support server side filters, so doing client side temporarily
+ if(corelationId.equals(m.getMessageProperties().getCorrelationId())){
+ queue.add(m);
+ }else{
+ RangeSet r = new RangeSet();
+ r.add(m.getMessageTransferId());
+ session.messageRelease(r);
+ }
+ }
+}
Added: webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/URIParser.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/URIParser.java?rev=609189&view=auto
==============================================================================
--- webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/URIParser.java (added)
+++ webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/URIParser.java Sat Jan 5 10:35:46 2008
@@ -0,0 +1,34 @@
+package org.apache.synapse.transport.amqp;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** sample uri formats - this is temporary until the AMQP WG defines a proper addressing scheme
+*
+* uri="amqp:/direct?transport.amqp.RoutingKey=SimpleStockQuoteService&transport.amqp.ConnectionURL=qpid:virtualhost=test;client_id=foo@tcp:myhost.com:5672"
+* amqp:/topic?transport.amqp.RoutingKey=weather.us.ny&transport.amqp.ConnectionURL=qpid:virtualhost=test;client_id=foo@tcp:myhost.com:5672
+*/
+public class URIParser
+{
+
+ public static Map parse(String uri){
+ Map props = new HashMap();
+ String dest = uri.substring(6,uri.indexOf("?"));
+ if (dest == null || dest.trim().equals("")){
+ throw new IllegalArgumentException("destination cannot be null");
+ }
+ props.put(AMQPConstants.EXCHANGE_NAME_PARAM, dest);
+ String paramStr = uri.substring(uri.indexOf("?")+1,uri.length());
+ String[] params = paramStr.split("&");
+ for (String param:params){
+ String key = param.substring(0,param.indexOf("="));
+ String value = param.substring(param.indexOf("=")+1,param.length());
+ props.put(key, value);
+ }
+ return props;
+ }
+
+ public static void main(String[] args){
+ Map p = URIParser.parse("amqp:/direct?routing_key=SimpleStockQuoteService&transport.amqp.ConnectionURL=qpid:virtualhost=test;client_id=foo@tcp:myhost.com:5672");
+ }
+}
Modified: webservices/synapse/trunk/java/pom.xml
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/pom.xml?rev=609189&r1=609188&r2=609189&view=diff
==============================================================================
--- webservices/synapse/trunk/java/pom.xml (original)
+++ webservices/synapse/trunk/java/pom.xml Sat Jan 5 10:35:46 2008
@@ -673,6 +673,16 @@
<version>${commons.vfs.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-common</artifactId>
+ <version>${qpid.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-client</artifactId>
+ <version>${qpid.version}</version>
+ </dependency>
+ <dependency>
<groupId>de.schlichtherle.io</groupId>
<artifactId>truezip</artifactId>
<version>${truezip.version}</version>
@@ -1034,6 +1044,17 @@
<updatePolicy>daily</updatePolicy>
</snapshots>
</repository>
+ <repository>
+ <id>qpid-private</id>
+ <name>Private repo for qpid jars</name>
+ <url>http://people.apache.org/~rajith/maven2/</url>
+ <releases>
+ <updatePolicy>never</updatePolicy>
+ </releases>
+ <snapshots>
+ <updatePolicy>daily</updatePolicy>
+ </snapshots>
+ </repository>
</repositories>
<modules>
@@ -1109,6 +1130,8 @@
<bsf.version>3.0-beta2</bsf.version>
<groovy.version>1.0</groovy.version>
<servlet-api.version>2.3</servlet-api.version>
+
+ <qpid.version>1.0-incubating-M2-SNAPSHOT</qpid.version>
</properties>
---------------------------------------------------------------------
To unsubscribe, e-mail: synapse-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: synapse-dev-help@ws.apache.org