You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by ra...@apache.org on 2008/02/01 02:52:30 UTC

svn commit: r617324 - /synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/

Author: rajith
Date: Thu Jan 31 17:52:27 2008
New Revision: 617324

URL: http://svn.apache.org/viewvc?rev=617324&view=rev
Log:
Added more functionality to AMQPListener, but still more work needs to be done before it is functional.

Added:
    synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPBinding.java
    synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPConnection.java
Modified:
    synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPConstants.java
    synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPListener.java
    synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPOutTransportInfo.java
    synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPSender.java
    synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPUtils.java
    synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/URIParser.java

Added: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPBinding.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPBinding.java?rev=617324&view=auto
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPBinding.java (added)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPBinding.java Thu Jan 31 17:52:27 2008
@@ -0,0 +1,58 @@
+package org.apache.synapse.transport.amqp;
+
+
+public class AMQPBinding {
+
+    private String exchangeName = "amq.direct";
+    private String exchangeType = "direct";
+    private String routingKey;
+    private boolean primary;
+
+    public AMQPBinding(){
+    }
+
+    public AMQPBinding(String exchangeName, String exchangeType, String routingKey, boolean primary)
+    {
+        super();
+        this.exchangeName = exchangeName;
+        this.exchangeType = exchangeType;
+        this.routingKey = routingKey;
+        this.primary = primary;
+    }
+
+    public String getExchangeName()
+    {
+        return exchangeName;
+    }
+    public void setExchangeName(String exchangeName)
+    {
+        this.exchangeName = exchangeName;
+    }
+    public String getExchangeType()
+    {
+        return exchangeType;
+    }
+    public void setExchangeType(String exchangeType)
+    {
+        this.exchangeType = exchangeType;
+    }
+    public String getRoutingKey()
+    {
+        return routingKey;
+    }
+    public void setRoutingKey(String routingKey)
+    {
+        this.routingKey = routingKey;
+    }
+
+    public boolean isPrimary()
+    {
+        return primary;
+    }
+
+    public void setPrimary(boolean primary)
+    {
+        this.primary = primary;
+    }
+
+}

Added: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPConnection.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPConnection.java?rev=617324&view=auto
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPConnection.java (added)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPConnection.java Thu Jan 31 17:52:27 2008
@@ -0,0 +1,146 @@
+package org.apache.synapse.transport.amqp;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.qpidity.ErrorCode;
+import org.apache.qpidity.nclient.Client;
+import org.apache.qpidity.nclient.ClosedListener;
+import org.apache.qpidity.nclient.Connection;
+
+
+class AMQPConnection implements ClosedListener{
+
+    private static final Log log = LogFactory.getLog(AMQPConnection.class);
+
+    /** Connection name as identified in the axis2.xml */
+    private String name;
+
+    /** The AMQP URL */
+    private String url;
+
+    /** the AMQP connection */
+    private Connection con;
+
+    /** the exchange name to use */
+    private String exchangeName = "amq.direct";
+
+    /** the exchangeType to use */
+    private String exchangeType = "direct";
+
+    /** if connection dropped, reconnect timeout in milliseconds; default 30 seconds */
+    private long reconnectTimeout = 30000;
+
+    public AMQPConnection()
+    {
+    }
+
+    public AMQPConnection(String name, String url, String exchangeName, String exchangeType)
+    {
+        super();
+        this.name = name;
+        this.url = url;
+        this.exchangeName = exchangeName;
+        this.exchangeType = exchangeType;
+    }
+
+    public String getExchangeName()
+    {
+        return exchangeName;
+    }
+
+    public void setExchangeName(String exchangeName)
+    {
+        this.exchangeName = exchangeName;
+    }
+
+    public String getExchangeType()
+    {
+        return exchangeType;
+    }
+
+    public void setExchangeType(String exchangeType)
+    {
+        this.exchangeType = exchangeType;
+    }
+
+    public String getName()
+    {
+        return name;
+    }
+
+    public void setName(String name)
+    {
+        this.name = name;
+    }
+
+    public String getUrl()
+    {
+        return url;
+    }
+
+    public void setUrl(String url)
+    {
+        this.url = url;
+    }
+
+    public Connection getConnection()
+    {
+        return con;
+    }
+
+    public void setConnection(Connection con)
+    {
+        this.con = con;
+    }
+
+    public long getReconnectTimeout() {
+        return reconnectTimeout;
+    }
+
+    public void setReconnectTimeout(long reconnectTimeout) {
+        this.reconnectTimeout = reconnectTimeout;
+    }
+
+    public void stop(){
+
+    }
+
+    public void start() throws AMQPSynapseException
+    {
+        Connection con = Client.createConnection();
+        try{
+            con.connect(url);
+        }catch(Exception e){
+            throw new AMQPSynapseException("Error creating a connection to the broker",e);
+        }
+    }
+
+    public void onClosed(ErrorCode errorCode, String reason)
+    {
+        log.error("AMQP connection " + name + " encountered an error, Error code:" + errorCode + " reason:" + reason);
+        boolean wasError = true;
+
+        // try to connect
+        // if error occurs wait and try again
+        while (wasError == true) {
+
+            try {
+               // connectAndListen();
+                wasError = false;
+
+            } catch (Exception e1) {
+                log.warn("AMQP reconnection attempt failed for connection : " + name,e1);
+            }
+
+            if (wasError == true) {
+                try {
+                    log.info("Attempting reconnection for connection " + name +
+                        " in " + getReconnectTimeout()/1000 +  " seconds");
+                    Thread.sleep(getReconnectTimeout());
+                } catch (InterruptedException ignore) {}
+            }
+        } // wasError
+    }
+
+
+}

Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPConstants.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPConstants.java?rev=617324&r1=617323&r2=617324&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPConstants.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPConstants.java Thu Jan 31 17:52:27 2008
@@ -21,17 +21,31 @@
      */
     public static final long DEFAULT_AMQP_TIMEOUT = Options.DEFAULT_TIMEOUT_MILLISECONDS;
 
-    //-------------------------- services.xml parameters --------------------------------
-    /**
-     * The Parameter name indicating the amqp destination for requests
-     */
+    //-------------------------- axis2.xml parameters --------------------------------
+    /** Connection URL specified in the axis2.xml or services.xml */
     public static final String CONNECTION_URL_PARAM = "transport.amqp.ConnectionURL";
 
+    /** default exchange name specified axis2.xml */
     public static final String EXCHANGE_NAME_PARAM = "transport.amqp.ExchangeName";
 
+    /** default exchange type specified axis2.xml */
     public static final String EXCHANGE_TYPE_PARAM = "transport.amqp.ExchangeType";
 
-    public static final String ROUTING_KEY_PARAM = "transport.amqp.RoutingKey";
+    //-------------------------- services.xml parameters --------------------------------
+    /** routing key specified in the services.xml */
+    public static final String BINDING_ROUTING_KEY_ATTR = "routingKey";
+
+    /** exchange name specified in the services.xml */
+    public static final String BINDING_EXCHANGE_NAME_ATTR = "exchangeName";
+
+    /** exchange type specified in the services.xml */
+    public static final String BINDING_EXCHANGE_TYPE_ATTR = "exchangeType";
+
+    /** bindings specified in the services.xml */
+    public static final String BINDINGS_PARAM = "transport.amqp.Bindings";
+
+    /** bindings specified in the services.xml */
+    public static final String BINDINGS_PRIMARY_ATTR = "primary";
 
     /**
      * The Parameter name indicating the response AMQP destination
@@ -47,7 +61,7 @@
      * The Parameter name of an Axis2 service, indicating the AMQP connection
      * which should be used to listen for messages for it.
      */
-    public static final String CONNECTION_PARAM = "transport.amqp.Connection";
+    public static final String CONNECTION_NAME_PARAM = "transport.amqp.ConnectionName";
     /**
      * If reconnect timeout if connection error occurs in seconds
      */

Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPListener.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPListener.java?rev=617324&r1=617323&r2=617324&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPListener.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPListener.java Thu Jan 31 17:52:27 2008
@@ -1,22 +1,109 @@
 package org.apache.synapse.transport.amqp;
 
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.axiom.om.OMElement;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.description.AxisService;
+import org.apache.axis2.description.Parameter;
+import org.apache.axis2.description.ParameterIncludeImpl;
+import org.apache.axis2.description.TransportInDescription;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.qpidity.nclient.Client;
+import org.apache.qpidity.nclient.Connection;
 import org.apache.synapse.transport.base.AbstractTransportListener;
+import org.apache.synapse.transport.base.BaseUtils;
+import org.apache.synapse.transport.jms.JMSConnectionFactory;
+import org.apache.synapse.transport.jms.JMSConstants;
+import org.apache.synapse.transport.jms.JMSUtils;
 
 public class AMQPListener extends AbstractTransportListener
 {
-    public static final String TRANSPORT_NAME = "jms";
+    public static final String TRANSPORT_NAME = "AMQP";
     private static final Log log = LogFactory.getLog(AMQPListener.class);
 
+
+    /** A Map containing the AMQP connections managed by this, keyed by name */
+    private Map<String, AMQPConnection> connections = new HashMap<String, AMQPConnection>();
+    /** A Map of service name to the AMQP EPR addresses */
+    private Map serviceNameToEPRMap = new HashMap();
+
+    @Override
+    public void init(ConfigurationContext cfgCtx, TransportInDescription transportIn) throws AxisFault
+    {
+        setTransportName(TRANSPORT_NAME);
+        super.init(cfgCtx, transportIn);
+        loadConnectionDefinitions(transportIn);
+
+        if (connections.isEmpty()) {
+            log.warn("No AMQP connections are defined. Cannot listen on AMQP");
+            return;
+        }
+
+        log.info("AMQP Transport Receiver/Listener initialized...");
+    }
+
+
+
+    @Override
+    public void start() throws AxisFault
+    {
+       for(String conName: connections.keySet()){
+          AMQPConnection conDef = connections.get(conName);
+          conDef.start();
+       }
+        super.start();
+    }
+
+
+    // Need to clean up the sessions as well
+    @Override
+    public void stop() throws AxisFault
+    {
+        for(String conName: connections.keySet()){
+            AMQPConnection connection = connections.get(conName);
+            try{
+                connection.stop();
+            }catch(Exception e){
+                throw new AMQPSynapseException("Error creating a connection to the broker",e);
+            }
+        }
+        super.stop();
+    }
+
+
     @Override
     protected void startListeningForService(AxisService service)
     {
-        // TODO Auto-generated method stub
+        if (service.getName().startsWith("__")) {
+            return;
+        }
+
+        AMQPConnection con = getConnectionFactory(service);
+        if (con == null) {
+            String msg = "Service " + service.getName() + " does not specify" +
+                         "an AMQP connection or refers to an invalid connection. " +
+                         "This service is being marked as faulty and will not be " +
+                         "available over the AMQP transport";
+            log.warn(msg);
+            BaseUtils.markServiceAsFaulty(service.getName(), msg, service.getAxisConfiguration());
+            return;
+        }
+
+        // compute service EPR and keep for later use
+        List<AMQPBinding> bindings = AMQPUtils.getBindingsForService(service);
 
+        serviceNameToEPRMap.put(service.getName(), URIParser.getEPR(bindings,con.getUrl()));
+
+        log.info("Starting to listen for service " + service.getName());
+
+        // create bindings for the service
     }
 
     @Override
@@ -26,10 +113,95 @@
 
     }
 
-    public EndpointReference[] getEPRsForService(String arg0, String arg1) throws AxisFault
+    public EndpointReference[] getEPRsForService(String serviceName, String ip) throws AxisFault
     {
-        // TODO Auto-generated method stub
-        return null;
+        //Strip out the operation name
+        if (serviceName.indexOf('/') != -1) {
+            serviceName = serviceName.substring(0, serviceName.indexOf('/'));
+        }
+        return new EndpointReference[]{
+            new EndpointReference((String) serviceNameToEPRMap.get(serviceName))};
     }
 
+    /**
+     * Create an AMQP Connection instances for the definitions in the transport listener,
+     * and add these map keyed by name
+     *
+     * @param transprtIn the transport-in description for AMQP
+     */
+    private void loadConnectionDefinitions(TransportInDescription transprtIn) {
+
+        // iterate through all defined connection definitions
+        Iterator conIter = transprtIn.getParameters().iterator();
+
+        while (conIter.hasNext()) {
+            Parameter conParams = (Parameter) conIter.next();
+
+            ParameterIncludeImpl pi = new ParameterIncludeImpl();
+            AMQPConnection conDef = new AMQPConnection();
+            try {
+                pi.deserializeParameters((OMElement) conParams.getValue());
+            } catch (AxisFault axisFault) {
+                log.error("Error reading parameters for AMQP Connection definitions" +
+                        conParams.getName(), axisFault);
+            }
+            conDef.setName((String)conParams.getValue());
+
+            Iterator params = pi.getParameters().iterator();
+            while (params.hasNext()) {
+
+                Parameter p = (Parameter) params.next();
+
+                if (AMQPConstants.CONNECTION_URL_PARAM.equals(p.getName())) {
+                    conDef.setUrl((String) p.getValue());
+                }
+                else if (AMQPConstants.EXCHANGE_NAME_PARAM.equals(p.getName())) {
+                    conDef.setExchangeName((String) p.getValue());
+                }
+                else if (AMQPConstants.EXCHANGE_TYPE_PARAM.equals(p.getName())) {
+                    conDef.setExchangeType((String) p.getValue());
+                }
+            }
+
+            connections.put(conDef.getName(), conDef);
+        }
+    }
+
+    /**
+     * Return the connection for this service. If this service
+     * refers to an invalid connection or defaults to a non-existent default
+     * connection, this returns null
+     *
+     * @param service the AxisService
+     * @return the AMQPConnection to be used, or null if reference is invalid
+     */
+    private AMQPConnection getConnectionFactory(AxisService service) {
+        Parameter conNameParam = service.getParameter(AMQPConstants.CONNECTION_NAME_PARAM);
+        Parameter conURLParam = service.getParameter(AMQPConstants.CONNECTION_URL_PARAM);
+
+        // validate connection factory name (specified or default)
+        if (conNameParam != null) {
+            String conFac = (String) conNameParam.getValue();
+            if (connections.containsKey(conFac)) {
+                return (AMQPConnection) connections.get(conFac);
+            } else {
+                return null;
+            }
+
+        // Next see if service defines it's own connection
+        }else if (conURLParam != null){
+            AMQPConnection con = new AMQPConnection();
+            con.setUrl((String)conURLParam.getValue());
+            con.start();
+            connections.put(service.getName(), con);
+            return con;
+
+        // Next see if there is a default defined in axis2.xml
+        }else if (connections.containsKey(AMQPConstants.DEFAULT_CONNECTION)) {
+            return (AMQPConnection) connections.get(AMQPConstants.DEFAULT_CONNECTION);
+
+        } else {
+            return null;
+        }
+    }
 }

Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPOutTransportInfo.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPOutTransportInfo.java?rev=617324&r1=617323&r2=617324&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPOutTransportInfo.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPOutTransportInfo.java Thu Jan 31 17:52:27 2008
@@ -24,9 +24,13 @@
         } else {
            Map props = URIParser.parse(address);
            conURL = (String)props.get(AMQPConstants.CONNECTION_URL_PARAM);
-           routingKey = (String)props.get(AMQPConstants.ROUTING_KEY_PARAM);
+           routingKey = (String)props.get(AMQPConstants.BINDING_ROUTING_KEY_ATTR);
            exchangeName = (String)props.get(AMQPConstants.EXCHANGE_NAME_PARAM);
         }
+    }
+
+    public String getAddress(){
+        return address;
     }
 
     public String getConnectionURL(){

Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPSender.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPSender.java?rev=617324&r1=617323&r2=617324&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPSender.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPSender.java Thu Jan 31 17:52:27 2008
@@ -60,8 +60,6 @@
         // If targetEPR is not null, determine the addressing info from it
         if (targetEPR != null) {
             amqpTransportInfo = new AMQPOutTransportInfo(targetEPR);
-            // do we have a definition for a connection factory to use for this address?
-
         }
         // If not try to get the addressing info from the transport description
         else if (outTransportInfo != null && outTransportInfo instanceof AMQPOutTransportInfo) {
@@ -86,7 +84,6 @@
         }
 
         byte[] message = null;
-        String correlationId = null;
         try {
             message = createMessageData(msgCtx);
         } catch (AMQPSynapseException e) {
@@ -119,22 +116,6 @@
         deliveryProps.setExchange(amqpTransportInfo.getExchangeName());
         deliveryProps.setRoutingKey(amqpTransportInfo.getRoutingKey());
 
-        /* For efficiency I assume that the reply to exchange and destination is already created
-        *  If the reply is for the same service, then this should be the queue that the service is listening to
-        *  Blindly creating these exchanges,queues and bindings is sub optimal and can be avoid if the administrator
-        *  creates the nessacery exchanges,queues and bindings before hand.
-        *
-        *  If the service hasn't specify and it's a request/reply MEP then a temporary queue
-        *  (which is auto-deleted) is created and bound to the amq.direct exchange.
-        */
-        if (msgCtx.getProperty(AMQPConstants.AMQP_REPLY_TO_EXCHANGE_NAME) != null){
-            String replyExchangeName = (String) msgCtx.getProperty(AMQPConstants.AMQP_REPLY_TO_EXCHANGE_NAME);
-            String replyRoutingKey = msgCtx.getProperty(AMQPConstants.AMQP_REPLY_TO_ROUTING_KEY)!= null?(String) msgCtx.getProperty(AMQPConstants.AMQP_REPLY_TO_ROUTING_KEY):null;
-
-            // for fannout exchange or some other custom exchange, the routing key maybe null
-            msgProps.setReplyTo(new ReplyTo(replyExchangeName,replyRoutingKey));
-        }
-
         // Content type
         OMOutputFormat format = BaseUtils.getOMOutputFormat(msgCtx);
         MessageFormatter messageFormatter = null;
@@ -149,10 +130,10 @@
         msgProps.setContentType(contentType);
 
         // Custom properties - SOAP ACTION
-        Map<String,Object> props = new HashMap();
+        Map<String,Object> props = new HashMap<String,Object>();
 
         if (msgCtx.isServerSide()) {
-            // set SOAP Action as a property on the JMS message
+            // set SOAP Action as a property on the message
             props.put(BaseConstants.SOAPACTION,(String)msgCtx.getProperty(BaseConstants.SOAPACTION));
 
         } else {
@@ -175,6 +156,7 @@
 
                 if (AMQPConstants.AMQP_CORELATION_ID.equals(name)) {
                     msgProps.setCorrelationId((String) headerMap.get(AMQPConstants.AMQP_CORELATION_ID));
+                    // If it's request/response, then we need to fill in corelation id and reply to properties
                 }
                 else if (AMQPConstants.AMQP_DELIVERY_MODE.equals(name)) {
                     Object o = headerMap.get(AMQPConstants.AMQP_DELIVERY_MODE);
@@ -213,20 +195,46 @@
                 }
             }
         }
-        // If it's request/response, then we need to fill in corelation id and reply to properties
-        if (waitForResponse && msgCtx.getProperty(AMQPConstants.AMQP_CORELATION_ID) == null) {
-            msgProps.setCorrelationId(UUIDGenerator.getUUID());
-            if (msgProps.getReplyTo() == null){
-                // We need to use a temp queue here.
-                String tempQueueName = "Queue_" + msgProps.getCorrelationId();
-                synchronized(session){
-                    session.queueDeclare(tempQueueName, null, null, Option.AUTO_DELETE,Option.EXCLUSIVE);
-                    session.queueBind(tempQueueName, "amq.direct", tempQueueName, null);
-                    session.sync();
-                }
-                msgProps.replyTo(new ReplyTo("amq.direct",tempQueueName));
-            }
-        }
+
+        /* For efficiency I assume that the reply to exchange and destination is already created
+         *  If the reply is for the same service, then this should be the queue that the service is listening to.
+         *  Blindly creating these exchanges,queues and bindings is sub optimal and can be avoid if the administrator
+         *  creates the nessacery exchanges,queues and bindings before hand.
+         *
+         *  If the service hasn't specify and it's a request/reply MEP then a temporary queue
+         *  (which is auto-deleted) is created and bound to the amq.direct exchange.
+         */
+         if (msgCtx.getProperty(AMQPConstants.AMQP_REPLY_TO_EXCHANGE_NAME) != null){
+             String replyExchangeName = (String) msgCtx.getProperty(AMQPConstants.AMQP_REPLY_TO_EXCHANGE_NAME);
+             String replyRoutingKey = msgCtx.getProperty(AMQPConstants.AMQP_REPLY_TO_ROUTING_KEY)!= null?(String) msgCtx.getProperty(AMQPConstants.AMQP_REPLY_TO_ROUTING_KEY):null;
+
+             // for fannout exchange or some other custom exchange, the routing key maybe null
+             msgProps.setReplyTo(new ReplyTo(replyExchangeName,replyRoutingKey));
+         }
+
+         // If it's request/response, then we need to fill in reply to properties and correlation_id
+         if (waitForResponse){
+
+             if (waitForResponse && msgProps.getCorrelationId() == null) {
+                 if (msgCtx.getProperty(AMQPConstants.AMQP_CORELATION_ID) != null){
+                     msgProps.setCorrelationId((String)msgCtx.getProperty(AMQPConstants.AMQP_CORELATION_ID));
+                 }else{
+                     msgProps.setCorrelationId(UUIDGenerator.getUUID());
+                 }
+
+             }
+
+             if (msgProps.getReplyTo() == null){
+                 //We need to use a temp queue here.
+                 String tempQueueName = "Queue_" + msgProps.getCorrelationId();
+                 synchronized(session){
+                     session.queueDeclare(tempQueueName, null, null, Option.AUTO_DELETE,Option.EXCLUSIVE);
+                     session.queueBind(tempQueueName, "amq.direct", tempQueueName, null);
+                     session.sync();
+                 }
+                 msgProps.replyTo(new ReplyTo("amq.direct",tempQueueName));
+             }
+         }
     }
 
     private byte[] createMessageData(MessageContext msgContext){
@@ -265,7 +273,7 @@
         session.messageSubscribe(msgProps.getReplyTo().getRoutingKey(),
                                  destination,
                                  Session.TRANSFER_CONFIRM_MODE_REQUIRED,
-                                 Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE,
+                                 Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE,
                                  new MessagePartListenerAdapter(listener), null, Option.NO_OPTION);
 
         Message reply = listener.receive(timeout);

Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPUtils.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPUtils.java?rev=617324&r1=617323&r2=617324&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPUtils.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPUtils.java Thu Jan 31 17:52:27 2008
@@ -3,19 +3,25 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
-import java.util.Enumeration;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Queue;
-import javax.jms.Topic;
+import javax.xml.namespace.QName;
 
+import org.apache.axiom.om.OMAttribute;
+import org.apache.axiom.om.OMElement;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.description.AxisService;
+import org.apache.axis2.description.Parameter;
+import org.apache.axis2.description.ParameterIncludeImpl;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.qpidity.api.Message;
 import org.apache.synapse.transport.base.BaseUtils;
+import org.apache.synapse.transport.jms.JMSConnectionFactory;
 import org.apache.synapse.transport.jms.JMSConstants;
 
 public class AMQPUtils extends BaseUtils
@@ -85,7 +91,7 @@
      */
     public static Map getTransportHeaders(Message message) {
         // create a Map to hold transport headers
-        Map map = new HashMap();
+        Map<String,Object> map = new HashMap<String,Object>();
 
         // correlation ID
         if (message.getMessageProperties().getCorrelationId() != null) {
@@ -131,4 +137,53 @@
 
         return map;
     }
+
+    /**
+     * Get the AMQP destination used by this service
+     *
+     * @param service the Axis Service
+     * @return the name of the JMS destination
+     */
+    public static List<AMQPBinding> getBindingsForService(AxisService service) {
+        Parameter bindingsParam = service.getParameter(AMQPConstants.BINDINGS_PARAM);
+        ParameterIncludeImpl pi = new ParameterIncludeImpl();
+        try {
+            pi.deserializeParameters((OMElement) bindingsParam.getValue());
+        } catch (AxisFault axisFault) {
+            log.error("Error reading parameters for AMQP binding definitions" +
+                    bindingsParam.getName(), axisFault);
+        }
+
+        Iterator params = pi.getParameters().iterator();
+        ArrayList<AMQPBinding> list = new ArrayList<AMQPBinding>();
+        if(params.hasNext())
+        {
+            while (params.hasNext())
+            {
+                Parameter p = (Parameter) params.next();
+                AMQPBinding binding = new AMQPBinding();
+                OMAttribute exchangeTypeAttr = p.getParameterElement().getAttribute(new QName(AMQPConstants.BINDING_EXCHANGE_TYPE_ATTR));
+                OMAttribute exchangeNameAttr = p.getParameterElement().getAttribute(new QName(AMQPConstants.BINDING_EXCHANGE_NAME_ATTR));
+                OMAttribute routingKeyAttr = p.getParameterElement().getAttribute(new QName(AMQPConstants.BINDING_ROUTING_KEY_ATTR));
+                OMAttribute primaryAttr = p.getParameterElement().getAttribute(new QName(AMQPConstants.BINDINGS_PRIMARY_ATTR));
+
+                if ( exchangeTypeAttr != null) {
+                    binding.setExchangeType(exchangeTypeAttr.getAttributeValue());
+                }else if ( exchangeNameAttr != null) {
+                    binding.setExchangeName(exchangeNameAttr.getAttributeValue());
+                }else if ( primaryAttr != null) {
+                    binding.setPrimary(true);
+                }
+                list.add(binding);
+            }
+        }else{
+            // go for the defaults
+            AMQPBinding binding = new AMQPBinding();
+            binding.setRoutingKey(service.getName());
+            list.add(binding);
+        }
+
+        return list;
+    }
+
 }

Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/URIParser.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/URIParser.java?rev=617324&r1=617323&r2=617324&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/URIParser.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/URIParser.java Thu Jan 31 17:52:27 2008
@@ -1,31 +1,78 @@
 package org.apache.synapse.transport.amqp;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /** sample uri formats - this is temporary until the AMQP WG defines a proper addressing scheme
 *
-* uri="amqp:/direct?transport.amqp.RoutingKey=SimpleStockQuoteService&amp;transport.amqp.ConnectionURL=qpid:virtualhost=test;client_id=foo@tcp:myhost.com:5672"
-* amqp:/topic?transport.amqp.RoutingKey=weather.us.ny&amp;transport.amqp.ConnectionURL=qpid:virtualhost=test;client_id=foo@tcp:myhost.com:5672
+* uri="amqp:/direct/amq.direct?transport.amqp.RoutingKey=SimpleStockQuoteService&amp;transport.amqp.ConnectionURL=qpid:virtualhost=test;client_id=foo@tcp:myhost.com:5672"
+* amqp:/topic/amq.topic?transport.amqp.RoutingKey=weather.us.ny&amp;transport.amqp.ConnectionURL=qpid:virtualhost=test;client_id=foo@tcp:myhost.com:5672
 */
 public class URIParser
 {
 
     public static Map parse(String uri){
        Map props = new HashMap();
-       String dest = uri.substring(6,uri.indexOf("?"));
-       if (dest == null || dest.trim().equals("")){
-          throw new IllegalArgumentException("destination cannot be null");
+       String temp = uri.substring(6,uri.indexOf("?"));
+       String exchangeType =  temp.substring(0,temp.indexOf("/"));
+       String exchangeName =  temp.substring(temp.indexOf("/"),temp.length());
+       if (exchangeType == null || exchangeType.trim().equals("")){
+          throw new IllegalArgumentException("exchange type cannot be null");
        }
-       props.put(AMQPConstants.EXCHANGE_NAME_PARAM, dest);
+       if (exchangeType == null || exchangeType.trim().equals("")){
+           throw new IllegalArgumentException("exchange name cannot be null");
+        }
+       props.put(AMQPConstants.EXCHANGE_NAME_PARAM, exchangeName);
+       props.put(AMQPConstants.EXCHANGE_TYPE_PARAM, exchangeType);
        String paramStr =  uri.substring(uri.indexOf("?")+1,uri.length());
        String[] params = paramStr.split("&amp;");
        for (String param:params){
            String key = param.substring(0,param.indexOf("="));
            String value = param.substring(param.indexOf("=")+1,param.length());
+           if("connectionURL".equals(key)){
+               key = AMQPConstants.CONNECTION_URL_PARAM;
+           }
            props.put(key, value);
        }
        return props;
+    }
+
+    /**
+     * Get the EPR for the given AMQP details
+     * the form of the URL is
+     * uri="amqp:/direct?routingKey=SimpleStockQuoteService&amp;connectionURL=qpid:virtualhost=test;client_id=foo@tcp:myhost.com:5672"
+     *
+     * Creates the EPR with the primary binding
+     *
+     */
+    public static String getEPR(List<AMQPBinding> list, String url) {
+
+        String epr = null;
+        for (AMQPBinding binding:list){
+
+            if (binding.isPrimary()){
+                StringBuffer sb = new StringBuffer();
+                sb.append(AMQPConstants.AMQP_PREFIX).append("/").append(binding.getExchangeType());
+                sb.append("/").append(binding.getExchangeName());
+                sb.append("?").append(AMQPConstants.BINDING_ROUTING_KEY_ATTR).append("=").append(binding.getRoutingKey());
+                sb.append("&amp;").append("connectionURL=").append(url);
+                epr = sb.toString();
+            }
+        }
+
+        // If no primary is defined just get the first
+        if(epr == null){
+            AMQPBinding binding = list.get(0);
+            StringBuffer sb = new StringBuffer();
+            sb.append(AMQPConstants.AMQP_PREFIX).append("/").append(binding.getExchangeType());
+            sb.append("/").append(binding.getExchangeName());
+            sb.append("?").append(AMQPConstants.BINDING_ROUTING_KEY_ATTR).append("=").append(binding.getRoutingKey());
+            sb.append("&amp;").append("connectionURL=").append(url);
+            epr = sb.toString();
+        }
+
+        return epr;
     }
 
     public static void main(String[] args){