You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@synapse.apache.org by ra...@apache.org on 2008/01/05 19:35:47 UTC

svn commit: r609189 - in /webservices/synapse/trunk/java: ./ modules/transports/src/main/java/org/apache/synapse/transport/amqp/

Author: rajith
Date: Sat Jan  5 10:35:46 2008
New Revision: 609189

URL: http://svn.apache.org/viewvc?rev=609189&view=rev
Log:
Initial code drop for a native AMQP transport based on the Apache Qpid project.
JIRA no is https://issues.apache.org/jira/browse/SYNAPSE-223
There is still outstanding work, most notably the TransportListener and test cases needs to be written.
Since Apache Qpid hasn't released an 0-10 client yet, I have uploaded Qpid jars to my private repo hosted in my apache home dir.
I tested and the dependencies resolve properly for me. However if you find deps are not resolving, please feel free to rollback the changes


Added:
    webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/
    webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPConstants.java
    webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPListener.java
    webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPOutTransportInfo.java
    webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPSender.java
    webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPSynapseException.java
    webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPUtils.java
    webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/MessageManager.java
    webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/URIParser.java
Modified:
    webservices/synapse/trunk/java/pom.xml

Added: webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPConstants.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPConstants.java?rev=609189&view=auto
==============================================================================
--- webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPConstants.java (added)
+++ webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPConstants.java Sat Jan  5 10:35:46 2008
@@ -0,0 +1,109 @@
+package org.apache.synapse.transport.amqp;
+
+import org.apache.axis2.client.Options;
+
+public class AMQPConstants
+{
+
+    /**
+     * The prefix indicating an Axis JMS URL
+     */
+    public static final String AMQP_PREFIX = "amqp:";
+
+    //------------------------------------ defaults ------------------------------------
+    /**
+     * The local (Axis2) AMQP connection name of the default connection
+     * to be used.
+     */
+    public static final String DEFAULT_CONNECTION = "default";
+    /**
+     * The default AMQP time out waiting for a reply
+     */
+    public static final long DEFAULT_AMQP_TIMEOUT = Options.DEFAULT_TIMEOUT_MILLISECONDS;
+
+    //-------------------------- services.xml parameters --------------------------------
+    /**
+     * The Parameter name indicating the amqp destination for requests
+     */
+    public static final String CONNECTION_URL_PARAM = "transport.amqp.ConnectionURL";
+
+    public static final String EXCHANGE_NAME_PARAM = "transport.amqp.ExchangeName";
+
+    public static final String EXCHANGE_TYPE_PARAM = "transport.amqp.ExchangeType";
+
+    public static final String ROUTING_KEY_PARAM = "transport.amqp.RoutingKey";
+
+    /**
+     * The Parameter name indicating the response AMQP destination
+     */
+    public static final String REPLY_EXCHANGE_TYPE_PARAM = "transport.amqp.ReplyExchangeType";
+    public static final String REPLY_EXCHANGE_NAME_PARAM = "transport.amqp.ReplyExchangeName";
+    /**
+     * The Parameter name indicating the response AMQP destination class.Ex direct,topic,fannot ..etc
+     */
+    public static final String REPLY_ROUTING_KEY_PARAM = "transport.amqp.ReplyRoutingKey";
+
+    /**
+     * The Parameter name of an Axis2 service, indicating the AMQP connection
+     * which should be used to listen for messages for it.
+     */
+    public static final String CONNECTION_PARAM = "transport.amqp.Connection";
+    /**
+     * If reconnect timeout if connection error occurs in seconds
+     */
+    public static final String RECONNECT_TIMEOUT = "transport.amqp.ReconnectTimeout";
+
+    //------------ message context / transport header properties and client options ------------
+    /**
+     * A MessageContext property or client Option stating the time to wait for a response JMS message
+     */
+    public static final String AMQP_WAIT_REPLY = "AMQP_WAIT_REPLY";
+    /**
+     * A MessageContext property or client Option stating the AMQP correlation id
+     */
+    public static final String AMQP_CORELATION_ID = "AMQP_CORELATION_ID";
+    /**
+     * A MessageContext property or client Option stating the AMQP message id
+     */
+    public static final String AMQP_MESSAGE_ID = "AMQP_MESSAGE_ID";
+    /**
+     * A MessageContext property or client Option stating the AMQP delivery mode
+     */
+    public static final String AMQP_DELIVERY_MODE = "AMQP_DELIVERY_MODE";
+    /**
+     * A MessageContext property or client Option stating the AMQP destination
+     */
+    public static final String AMQP_EXCHANGE_NAME = "AMQP_EXCHANGE_NAME";
+
+    public static final String AMQP_EXCHANGE_TYPE = "AMQP_EXCHANGE_TYPE";
+
+    public static final String AMQP_ROUTING_KEY = "AMQP_ROUTING_KEY";
+    /**
+     * A MessageContext property or client Option stating the AMQP expiration
+     */
+    public static final String AMQP_EXPIRATION = "AMQP_EXPIRATION";
+    /**
+     * A MessageContext property or client Option stating the AMQP priority
+     */
+    public static final String AMQP_PRIORITY = "AMQP_PRIORITY";
+    /**
+     * A MessageContext property stating if the message is a redelivery
+     */
+    public static final String AMQP_REDELIVERED = "AMQP_REDELIVERED";
+    /**
+     * A MessageContext property or client Option stating the AMQP replyTo
+     */
+    public static final String AMQP_REPLY_TO_EXCHANGE_NAME = "AMQP_REPLY_TO_EXCHANGE_NAME";
+
+    public static final String AMQP_REPLY_TO_EXCHANGE_TYPE = "AMQP_REPLY_TO_EXCHANGE_TYPE";
+
+    public static final String AMQP_REPLY_TO_ROUTING_KEY = "AMQP_REPLY_TO_ROUTING_KEY";
+    /**
+     * A MessageContext property or client Option stating the AMQP timestamp
+     */
+    public static final String AMQP_TIMESTAMP = "AMQP_TIMESTAMP";
+    /**
+     * A MessageContext property or client Option stating the AMQP type
+     */
+    public static final String AMQP_CONTENT_TYPE = "AMQP_CONTENT_TYPE";
+}

Added: webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPListener.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPListener.java?rev=609189&view=auto
==============================================================================
--- webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPListener.java (added)
+++ webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPListener.java Sat Jan  5 10:35:46 2008
@@ -0,0 +1,35 @@
+package org.apache.synapse.transport.amqp;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.description.AxisService;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.transport.base.AbstractTransportListener;
+
+public class AMQPListener extends AbstractTransportListener
+{
+    public static final String TRANSPORT_NAME = "jms";
+    private static final Log log = LogFactory.getLog(AMQPListener.class);
+
+    @Override
+    protected void startListeningForService(AxisService service)
+    {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    protected void stopListeningForService(AxisService service)
+    {
+        // TODO Auto-generated method stub
+
+    }
+
+    public EndpointReference[] getEPRsForService(String arg0, String arg1) throws AxisFault
+    {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+}

Added: webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPOutTransportInfo.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPOutTransportInfo.java?rev=609189&view=auto
==============================================================================
--- webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPOutTransportInfo.java (added)
+++ webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPOutTransportInfo.java Sat Jan  5 10:35:46 2008
@@ -0,0 +1,57 @@
+package org.apache.synapse.transport.amqp;
+
+import java.util.Map;
+
+import org.apache.axis2.transport.OutTransportInfo;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class AMQPOutTransportInfo implements OutTransportInfo
+{
+
+    private static final Log log = LogFactory.getLog(OutTransportInfo.class);
+    private String address = null;
+    private String contentType = null;
+    private String conURL = null;
+    private String exchangeName = null;
+    private String routingKey = null;
+
+    public AMQPOutTransportInfo(String address)
+    {
+        this.address = address;
+        if (!address.startsWith(AMQPConstants.AMQP_PREFIX)) {
+            handleException("Invalid prefix for a AMQP EPR : " + address);
+        } else {
+           Map props = URIParser.parse(address);
+           conURL = (String)props.get(AMQPConstants.CONNECTION_URL_PARAM);
+           routingKey = (String)props.get(AMQPConstants.ROUTING_KEY_PARAM);
+           exchangeName = (String)props.get(AMQPConstants.EXCHANGE_NAME_PARAM);
+        }
+    }
+
+    public String getConnectionURL(){
+        return conURL;
+    }
+
+    public String getExchangeName(){
+        return exchangeName;
+    }
+
+    public String getRoutingKey(){
+        return routingKey;
+    }
+
+    public void setContentType(String contentType){
+        this.contentType = contentType;
+    }
+
+    private void handleException(String s, Exception e) {
+        log.error(s, e);
+        throw new AMQPSynapseException(s,e);
+    }
+
+    private void handleException(String s) {
+        log.error(s);
+        throw new AMQPSynapseException(s);
+    }
+}

Added: webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPSender.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPSender.java?rev=609189&view=auto
==============================================================================
--- webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPSender.java (added)
+++ webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPSender.java Sat Jan  5 10:35:46 2008
@@ -0,0 +1,327 @@
+package org.apache.synapse.transport.amqp;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.axiom.om.OMOutputFormat;
+import org.apache.axiom.om.util.UUIDGenerator;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.description.TransportOutDescription;
+import org.apache.axis2.transport.MessageFormatter;
+import org.apache.axis2.transport.OutTransportInfo;
+import org.apache.axis2.transport.TransportUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.qpidity.api.Message;
+import org.apache.qpidity.nclient.Client;
+import org.apache.qpidity.nclient.Connection;
+import org.apache.qpidity.nclient.Session;
+import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
+import org.apache.qpidity.transport.DeliveryProperties;
+import org.apache.qpidity.transport.MessageProperties;
+import org.apache.qpidity.transport.Option;
+import org.apache.qpidity.transport.ReplyTo;
+import org.apache.synapse.transport.base.AbstractTransportSender;
+import org.apache.synapse.transport.base.BaseConstants;
+import org.apache.synapse.transport.base.BaseUtils;
+import org.apache.synapse.transport.jms.JMSUtils;
+
+public class AMQPSender extends AbstractTransportSender {
+
+    public static final String TRANSPORT_NAME = "amqp";
+    private static final Log log = LogFactory.getLog(AMQPSender.class);
+    private Map<String,ConnectionDetails> _connectionDetails = new HashMap<String,ConnectionDetails>();
+
+    @Override
+    public void init(ConfigurationContext cfgCtx, TransportOutDescription transportOut) throws AxisFault {
+        setTransportName(TRANSPORT_NAME);
+        super.init(cfgCtx, transportOut);
+        // init connections
+    }
+
+    /**
+     * Needs a more robust strategy to cache connections and sessions
+     * For efficiency I assume that the reply to exchange,queue and the binding already exists.
+     *
+     * For synchrouns request/reponse a temp queue will be create and bound to the direct exchange.
+     */
+    @Override
+    public void sendMessage(MessageContext msgCtx, String targetEPR, OutTransportInfo outTransportInfo) throws AxisFault{
+
+        AMQPOutTransportInfo amqpTransportInfo = null;
+        ConnectionDetails conDetails = null;
+        Session session = null;
+
+        // If targetEPR is not null, determine the addressing info from it
+        if (targetEPR != null) {
+            amqpTransportInfo = new AMQPOutTransportInfo(targetEPR);
+            // do we have a definition for a connection factory to use for this address?
+
+        }
+        // If not try to get the addressing info from the transport description
+        else if (outTransportInfo != null && outTransportInfo instanceof AMQPOutTransportInfo) {
+            amqpTransportInfo = (AMQPOutTransportInfo) outTransportInfo;
+        }
+
+        if (_connectionDetails.containsKey(amqpTransportInfo.getConnectionURL())){
+            conDetails = _connectionDetails.get(amqpTransportInfo.getConnectionURL());
+        }else{
+            // else create a new connection
+            Connection con = Client.createConnection();
+            try{
+                con.connect(amqpTransportInfo.getConnectionURL());
+            }catch(Exception e){
+                throw new AMQPSynapseException("Error creating a connection to the broker",e);
+            }
+            _connectionDetails.put(amqpTransportInfo.getConnectionURL(), new ConnectionDetails(con));
+        }
+
+        if (conDetails != null) {
+            session = conDetails.getSession();
+        }
+
+        byte[] message = null;
+        String correlationId = null;
+        try {
+            message = createMessageData(msgCtx);
+        } catch (AMQPSynapseException e) {
+            handleException("Error creating a message from the axis message context", e);
+        }
+
+        // should we wait for a synchronous response on this same thread?
+        boolean waitForResponse = waitForSynchronousResponse(msgCtx);
+
+        DeliveryProperties deliveryProps = new DeliveryProperties();
+        MessageProperties msgProps = new MessageProperties();
+        fillMessageHeaders(msgCtx,amqpTransportInfo,session,waitForResponse,deliveryProps,msgProps);
+
+        synchronized(session){
+            session.header(msgProps,deliveryProps);
+            session.data(message);
+            session.endData();
+        }
+
+        // if we are expecting a synchronous response back for the message sent out
+        if (waitForResponse) {
+            waitForResponseAndProcess(session, msgProps, msgCtx);
+        }
+    }
+
+    private void fillMessageHeaders(MessageContext msgCtx, AMQPOutTransportInfo amqpTransportInfo,
+                                    Session session, boolean waitForResponse,
+                                    DeliveryProperties deliveryProps, MessageProperties msgProps){
+        // Routing info
+        deliveryProps.setExchange(amqpTransportInfo.getExchangeName());
+        deliveryProps.setRoutingKey(amqpTransportInfo.getRoutingKey());
+
+        /* For efficiency I assume that the reply to exchange and destination is already created
+        *  If the reply is for the same service, then this should be the queue that the service is listening to
+        *  Blindly creating these exchanges,queues and bindings is sub optimal and can be avoid if the administrator
+        *  creates the nessacery exchanges,queues and bindings before hand.
+        *
+        *  If the service hasn't specify and it's a request/reply MEP then a temporary queue
+        *  (which is auto-deleted) is created and bound to the amq.direct exchange.
+        */
+        if (msgCtx.getProperty(AMQPConstants.AMQP_REPLY_TO_EXCHANGE_NAME) != null){
+            String replyExchangeName = (String) msgCtx.getProperty(AMQPConstants.AMQP_REPLY_TO_EXCHANGE_NAME);
+            String replyRoutingKey = msgCtx.getProperty(AMQPConstants.AMQP_REPLY_TO_ROUTING_KEY)!= null?(String) msgCtx.getProperty(AMQPConstants.AMQP_REPLY_TO_ROUTING_KEY):null;
+
+            // for fannout exchange or some other custom exchange, the routing key maybe null
+            msgProps.setReplyTo(new ReplyTo(replyExchangeName,replyRoutingKey));
+        }
+
+        // Content type
+        OMOutputFormat format = BaseUtils.getOMOutputFormat(msgCtx);
+        MessageFormatter messageFormatter = null;
+        try {
+            messageFormatter = TransportUtils.getMessageFormatter(msgCtx);
+        } catch (AxisFault axisFault) {
+            throw new AMQPSynapseException("Unable to get the message formatter to use");
+        }
+
+        String contentType = messageFormatter.getContentType(
+                msgCtx, format, msgCtx.getSoapAction());
+        msgProps.setContentType(contentType);
+
+        // Custom properties - SOAP ACTION
+        Map<String,Object> props = new HashMap();
+
+        if (msgCtx.isServerSide()) {
+            // set SOAP Action as a property on the JMS message
+            props.put(BaseConstants.SOAPACTION,(String)msgCtx.getProperty(BaseConstants.SOAPACTION));
+
+        } else {
+            String action = msgCtx.getOptions().getAction();
+            if (action != null) {
+                props.put(BaseConstants.SOAPACTION, action);
+            }
+        }
+
+        msgProps.setApplicationHeaders(props);
+
+        // transport headers
+        Map headerMap = (Map) msgCtx.getProperty(MessageContext.TRANSPORT_HEADERS);
+
+        if (headerMap != null){
+            Iterator iter = headerMap.keySet().iterator();
+            while (iter.hasNext()) {
+
+                String name = (String) iter.next();
+
+                if (AMQPConstants.AMQP_CORELATION_ID.equals(name)) {
+                    msgProps.setCorrelationId((String) headerMap.get(AMQPConstants.AMQP_CORELATION_ID));
+                }
+                else if (AMQPConstants.AMQP_DELIVERY_MODE.equals(name)) {
+                    Object o = headerMap.get(AMQPConstants.AMQP_DELIVERY_MODE);
+                    if (o instanceof Short) {
+                        deliveryProps.setDeliveryMode(((Short) o).shortValue());
+                    }else if (o instanceof Integer) {
+                        deliveryProps.setDeliveryMode(((Integer) o).shortValue());
+                    }else if (o instanceof String) {
+                        try {
+                            deliveryProps.setDeliveryMode(Short.parseShort((String) o));
+                        } catch (NumberFormatException nfe) {
+                            log.warn("Invalid delivery mode ignored : " + o, nfe);
+                        }
+                    } else {
+                        log.warn("Invalid delivery mode ignored : " + o);
+                    }
+                }
+                else if (AMQPConstants.AMQP_EXPIRATION.equals(name)) {
+                    deliveryProps.setExpiration(
+                        Long.parseLong((String) headerMap.get(AMQPConstants.AMQP_EXPIRATION)));
+                }
+                else if (AMQPConstants.AMQP_MESSAGE_ID.equals(name)) {
+                    msgProps.setMessageId((String) headerMap.get(AMQPConstants.AMQP_MESSAGE_ID));
+                }
+                else if (AMQPConstants.AMQP_PRIORITY.equals(name)) {
+                    deliveryProps.setPriority(
+                        Short.parseShort((String) headerMap.get(AMQPConstants.AMQP_PRIORITY)));
+                }
+                else if (AMQPConstants.AMQP_TIMESTAMP.equals(name)) {
+                    deliveryProps.setTimestamp(
+                        Long.parseLong((String) headerMap.get(AMQPConstants.AMQP_TIMESTAMP)));
+                }else {
+                    // custom app props
+                    Object value = headerMap.get(name);
+                    props.put(name, value);
+                }
+            }
+        }
+        // If it's request/response, then we need to fill in corelation id and reply to properties
+        if (waitForResponse && msgCtx.getProperty(AMQPConstants.AMQP_CORELATION_ID) == null) {
+            msgProps.setCorrelationId(UUIDGenerator.getUUID());
+            if (msgProps.getReplyTo() == null){
+                // We need to use a temp queue here.
+                String tempQueueName = "Queue_" + msgProps.getCorrelationId();
+                synchronized(session){
+                    session.queueDeclare(tempQueueName, null, null, Option.AUTO_DELETE,Option.EXCLUSIVE);
+                    session.queueBind(tempQueueName, "amq.direct", tempQueueName, null);
+                    session.sync();
+                }
+                msgProps.replyTo(new ReplyTo("amq.direct",tempQueueName));
+            }
+        }
+    }
+
+    private byte[] createMessageData(MessageContext msgContext){
+        OMOutputFormat format = BaseUtils.getOMOutputFormat(msgContext);
+        MessageFormatter messageFormatter = null;
+        try {
+            messageFormatter = TransportUtils.getMessageFormatter(msgContext);
+        } catch (AxisFault axisFault) {
+            throw new AMQPSynapseException("Unable to get the message formatter to use",axisFault);
+        }
+
+        String contentType = messageFormatter.getContentType(
+            msgContext, format, msgContext.getSoapAction());
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try {
+            messageFormatter.writeTo(msgContext, format, baos, true);
+            baos.flush();
+            return baos.toByteArray();
+        } catch (IOException e) {
+            throw new AMQPSynapseException("IO Error while creating message", e);
+        }
+    }
+
+    private void waitForResponseAndProcess(Session session, MessageProperties msgProps,
+            MessageContext msgCtx) throws AxisFault {
+
+        long timeout = AMQPConstants.DEFAULT_AMQP_TIMEOUT;
+        String waitReply = (String) msgCtx.getProperty(AMQPConstants.AMQP_WAIT_REPLY);
+        if (waitReply != null) {
+            timeout = Long.valueOf(waitReply).longValue();
+        }
+        // We are using the routing key (which is the queue name) as the destination
+        String destination = msgProps.getReplyTo().getRoutingKey();
+        MessageManager listener = new MessageManager(session,destination,msgProps.getCorrelationId());
+        session.messageSubscribe(msgProps.getReplyTo().getRoutingKey(),
+                                 destination,
+                                 Session.TRANSFER_CONFIRM_MODE_REQUIRED,
+                                 Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE,
+                                 new MessagePartListenerAdapter(listener), null, Option.NO_OPTION);
+
+        Message reply = listener.receive(timeout);
+
+        if (reply != null) {
+            processSyncResponse(msgCtx, reply);
+
+        } else {
+            log.warn("Did not receive a response within " +
+                timeout + " ms to destination : " + msgProps.getReplyTo().getRoutingKey() +
+                " with correlation ID : " + msgProps.getCorrelationId());
+        }
+    }
+
+    private void processSyncResponse(MessageContext outMsgCtx, Message message) throws AxisFault {
+
+        MessageContext responseMsgCtx = createResponseMessageContext(outMsgCtx);
+
+        // load any transport headers from received message
+        Map map = AMQPUtils.getTransportHeaders(message);
+        responseMsgCtx.setProperty(MessageContext.TRANSPORT_HEADERS, map);
+
+        // workaround for Axis2 TransportUtils.createSOAPMessage() issue, where a response
+        // of content type "text/xml" is thought to be REST if !MC.isServerSide(). This
+        // question is still under debate and due to the timelines, I am commiting this
+        // workaround as Axis2 1.2 is about to be released and Synapse 1.0
+        responseMsgCtx.setServerSide(false);
+
+        String contentType = JMSUtils.getInstace().getProperty(message, BaseConstants.CONTENT_TYPE);
+
+        AMQPUtils.getInstace().setSOAPEnvelope(message, responseMsgCtx, contentType);
+        responseMsgCtx.setServerSide(true);
+
+        handleIncomingMessage(
+            responseMsgCtx,
+            map,
+            (String)map.get(BaseConstants.SOAPACTION),
+            contentType
+        );
+    }
+
+    private class ConnectionDetails{
+        private Connection _conn;
+        //private Map _sessions = new HashMap();
+        private Session _session;
+
+        public ConnectionDetails(Connection conn){
+            _conn = conn;
+        }
+
+        public Session getSession(){
+          if (_session != null){
+              _session = _conn.createSession(0);
+          }
+          return _session;
+       }
+    }
+
+}
\ No newline at end of file

Added: webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPSynapseException.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPSynapseException.java?rev=609189&view=auto
==============================================================================
--- webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPSynapseException.java (added)
+++ webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPSynapseException.java Sat Jan  5 10:35:46 2008
@@ -0,0 +1,12 @@
+package org.apache.synapse.transport.amqp;
+
+public class AMQPSynapseException extends RuntimeException
+{
+    public AMQPSynapseException(String s, Exception e){
+        super(s,e);
+    }
+
+    public AMQPSynapseException(String s){
+        super(s);
+    }
+}

Added: webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPUtils.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPUtils.java?rev=609189&view=auto
==============================================================================
--- webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPUtils.java (added)
+++ webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPUtils.java Sat Jan  5 10:35:46 2008
@@ -0,0 +1,134 @@
+package org.apache.synapse.transport.amqp;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.Topic;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.qpidity.api.Message;
+import org.apache.synapse.transport.base.BaseUtils;
+import org.apache.synapse.transport.jms.JMSConstants;
+
+public class AMQPUtils extends BaseUtils
+{
+
+    private static final Log log = LogFactory.getLog(AMQPUtils.class);
+
+    private static BaseUtils _instance = new AMQPUtils();
+
+    public static BaseUtils getInstace() {
+        return _instance;
+    }
+
+    @Override
+    public InputStream getInputStream(Object message)
+    {
+        Message msg = (Message)message;
+        try{
+            final ByteBuffer buf = msg.readData();
+            return new InputStream() {
+                public synchronized int read() throws IOException {
+                    if (!buf.hasRemaining()) {
+                        return -1;
+                    }
+                    return buf.get();
+                }
+
+                public synchronized int read(byte[] bytes, int off, int len) throws IOException {
+                    // Read only what's left
+                    len = Math.min(len, buf.remaining());
+                    buf.get(bytes, off, len);
+                    return len;
+                }
+            };
+        }catch(IOException e){
+            throw new AMQPSynapseException("Error reading payload",e);
+        }
+    }
+
+    @Override
+    public byte[] getMessageBinaryPayload(Object message)
+    {
+        return null;
+    }
+
+    @Override
+    public String getMessageTextPayload(Object message)
+    {
+        return null;
+    }
+
+    @Override
+    public String getProperty(Object message, String property)
+    {
+        try {
+            return (String)((Message)message).getMessageProperties().getApplicationHeaders().get(property);
+        } catch (Exception e) {
+            return null;
+        }
+    }
+
+    /**
+     * Extract transport level headers for JMS from the given message into a Map
+     *
+     * @param message the JMS message
+     * @return a Map of the transport headers
+     */
+    public static Map getTransportHeaders(Message message) {
+        // create a Map to hold transport headers
+        Map map = new HashMap();
+
+        // correlation ID
+        if (message.getMessageProperties().getCorrelationId() != null) {
+            map.put(AMQPConstants.AMQP_CORELATION_ID, message.getMessageProperties().getCorrelationId());
+        }
+
+        // set the delivery mode as persistent or not
+        try {
+            map.put(AMQPConstants.AMQP_DELIVERY_MODE,message.getDeliveryProperties().getDeliveryMode());
+        } catch (Exception ignore) {}
+
+        // destination name
+        map.put(AMQPConstants.AMQP_EXCHANGE_NAME,message.getDeliveryProperties().getExchange());
+        map.put(AMQPConstants.AMQP_ROUTING_KEY,message.getDeliveryProperties().getRoutingKey());
+
+        // expiration
+        try {
+            map.put(AMQPConstants.AMQP_EXPIRATION, message.getDeliveryProperties().getExpiration());
+        } catch (Exception ignore) {}
+
+        // if a JMS message ID is found
+        if (message.getMessageProperties().getMessageId() != null) {
+            map.put(AMQPConstants.AMQP_MESSAGE_ID, message.getMessageProperties().getMessageId());
+        }
+
+        // priority
+        map.put(AMQPConstants.AMQP_PRIORITY,message.getDeliveryProperties().getPriority());
+
+        // redelivered
+        map.put(AMQPConstants.AMQP_REDELIVERED, message.getDeliveryProperties().getRedelivered());
+
+        // replyto destination name
+        if (message.getMessageProperties().getReplyTo() != null) {
+            map.put(AMQPConstants.AMQP_REPLY_TO_EXCHANGE_NAME, message.getMessageProperties().getReplyTo().getExchangeName());
+            map.put(AMQPConstants.AMQP_REPLY_TO_ROUTING_KEY, message.getMessageProperties().getReplyTo().getRoutingKey());
+        }
+
+        // priority
+        map.put(AMQPConstants.AMQP_TIMESTAMP,message.getDeliveryProperties().getTimestamp());
+
+        // any other transport properties / headers
+        map.putAll(message.getMessageProperties().getApplicationHeaders());
+
+        return map;
+    }
+}

Added: webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/MessageManager.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/MessageManager.java?rev=609189&view=auto
==============================================================================
--- webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/MessageManager.java (added)
+++ webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/MessageManager.java Sat Jan  5 10:35:46 2008
@@ -0,0 +1,72 @@
+package org.apache.synapse.transport.amqp;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.qpidity.api.Message;
+import org.apache.qpidity.nclient.Session;
+import org.apache.qpidity.nclient.util.MessageListener;
+import org.apache.qpidity.transport.RangeSet;
+
+public class MessageManager implements MessageListener
+{
+    private static final Log log = LogFactory.getLog(AMQPSender.class);
+    private ArrayBlockingQueue<Message> queue = new ArrayBlockingQueue<Message>(1,true);
+    private Session session;
+    private String destination;
+    private String corelationId;
+
+    public MessageManager(Session session, String destination,String corelationId){
+        this.session = session;
+        this.destination = destination;
+    }
+
+    /*
+     *  when this mehtod is called, it is assumed that we have exclusive access
+     *  to the session.
+     */
+    public Message receive(long timeout){
+        Message m;
+        session.messageFlow(destination,Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
+        session.messageFlow(destination,Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
+        try{
+            m = queue.poll(timeout, TimeUnit.MILLISECONDS);
+        }catch(Exception e){
+            throw new AMQPSynapseException("unable to receive message",e);
+        }
+
+        if (m == null)
+        {
+            log.debug("Message Didn't arrive in time, checking if one is inflight");
+            // checking if one is inflight
+            session.messageFlush(destination);
+            session.sync();
+            try{
+                m = queue.take();
+            }catch(Exception e){
+                throw new AMQPSynapseException("unable to receive message",e);
+            }
+        }
+
+        return m;
+    }
+
+    public void onMessage(Message m)
+    {
+        System.out.println("\n================== Received Msg ==================");
+        System.out.println("Message Id : " + m.getMessageProperties().getMessageId());
+        System.out.println(m.toString());
+        System.out.println("================== End Msg ==================\n");
+
+        //AMQP currently doesn't support server side filters, so doing client side temporarily
+        if(corelationId.equals(m.getMessageProperties().getCorrelationId())){
+            queue.add(m);
+        }else{
+            RangeSet r = new RangeSet();
+            r.add(m.getMessageTransferId());
+            session.messageRelease(r);
+        }
+    }
+}

Added: webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/URIParser.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/URIParser.java?rev=609189&view=auto
==============================================================================
--- webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/URIParser.java (added)
+++ webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/URIParser.java Sat Jan  5 10:35:46 2008
@@ -0,0 +1,34 @@
+package org.apache.synapse.transport.amqp;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** sample uri formats - this is temporary until the AMQP WG defines a proper addressing scheme
+*
+* uri="amqp:/direct?transport.amqp.RoutingKey=SimpleStockQuoteService&amp;transport.amqp.ConnectionURL=qpid:virtualhost=test;client_id=foo@tcp:myhost.com:5672"
+* amqp:/topic?transport.amqp.RoutingKey=weather.us.ny&amp;transport.amqp.ConnectionURL=qpid:virtualhost=test;client_id=foo@tcp:myhost.com:5672
+*/
+public class URIParser
+{
+
+    public static Map parse(String uri){
+       Map props = new HashMap();
+       String dest = uri.substring(6,uri.indexOf("?"));
+       if (dest == null || dest.trim().equals("")){
+          throw new IllegalArgumentException("destination cannot be null");
+       }
+       props.put(AMQPConstants.EXCHANGE_NAME_PARAM, dest);
+       String paramStr =  uri.substring(uri.indexOf("?")+1,uri.length());
+       String[] params = paramStr.split("&amp;");
+       for (String param:params){
+           String key = param.substring(0,param.indexOf("="));
+           String value = param.substring(param.indexOf("=")+1,param.length());
+           props.put(key, value);
+       }
+       return props;
+    }
+
+    public static void main(String[] args){
+        Map p = URIParser.parse("amqp:/direct?routing_key=SimpleStockQuoteService&amp;transport.amqp.ConnectionURL=qpid:virtualhost=test;client_id=foo@tcp:myhost.com:5672");
+    }
+}

Modified: webservices/synapse/trunk/java/pom.xml
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/pom.xml?rev=609189&r1=609188&r2=609189&view=diff
==============================================================================
--- webservices/synapse/trunk/java/pom.xml (original)
+++ webservices/synapse/trunk/java/pom.xml Sat Jan  5 10:35:46 2008
@@ -673,6 +673,16 @@
             <version>${commons.vfs.version}</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-common</artifactId>
+            <version>${qpid.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-client</artifactId>
+            <version>${qpid.version}</version>
+        </dependency>
+        <dependency>
             <groupId>de.schlichtherle.io</groupId>
             <artifactId>truezip</artifactId>
             <version>${truezip.version}</version>
@@ -1034,6 +1044,17 @@
                 <updatePolicy>daily</updatePolicy>
             </snapshots>
         </repository>
+        <repository>
+            <id>qpid-private</id>
+            <name>Private repo for qpid jars</name>
+            <url>http://people.apache.org/~rajith/maven2/</url>
+            <releases>
+                <updatePolicy>never</updatePolicy>
+            </releases>
+            <snapshots>
+                <updatePolicy>daily</updatePolicy>
+            </snapshots>
+        </repository>      
     </repositories>
 
     <modules>
@@ -1109,6 +1130,8 @@
         <bsf.version>3.0-beta2</bsf.version>
         <groovy.version>1.0</groovy.version>
         <servlet-api.version>2.3</servlet-api.version>
+
+        <qpid.version>1.0-incubating-M2-SNAPSHOT</qpid.version>
 
     </properties>
 



---------------------------------------------------------------------
To unsubscribe, e-mail: synapse-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: synapse-dev-help@ws.apache.org