You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by ra...@apache.org on 2008/02/01 02:52:30 UTC
svn commit: r617324 -
/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/
Author: rajith
Date: Thu Jan 31 17:52:27 2008
New Revision: 617324
URL: http://svn.apache.org/viewvc?rev=617324&view=rev
Log:
Added more functionality to AMQPListener, but still more work needs to be done before it is functional.
Added:
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPBinding.java
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPConnection.java
Modified:
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPConstants.java
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPListener.java
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPOutTransportInfo.java
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPSender.java
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPUtils.java
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/URIParser.java
Added: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPBinding.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPBinding.java?rev=617324&view=auto
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPBinding.java (added)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPBinding.java Thu Jan 31 17:52:27 2008
@@ -0,0 +1,58 @@
+package org.apache.synapse.transport.amqp;
+
+
+public class AMQPBinding {
+
+ private String exchangeName = "amq.direct";
+ private String exchangeType = "direct";
+ private String routingKey;
+ private boolean primary;
+
+ public AMQPBinding(){
+ }
+
+ public AMQPBinding(String exchangeName, String exchangeType, String routingKey, boolean primary)
+ {
+ super();
+ this.exchangeName = exchangeName;
+ this.exchangeType = exchangeType;
+ this.routingKey = routingKey;
+ this.primary = primary;
+ }
+
+ public String getExchangeName()
+ {
+ return exchangeName;
+ }
+ public void setExchangeName(String exchangeName)
+ {
+ this.exchangeName = exchangeName;
+ }
+ public String getExchangeType()
+ {
+ return exchangeType;
+ }
+ public void setExchangeType(String exchangeType)
+ {
+ this.exchangeType = exchangeType;
+ }
+ public String getRoutingKey()
+ {
+ return routingKey;
+ }
+ public void setRoutingKey(String routingKey)
+ {
+ this.routingKey = routingKey;
+ }
+
+ public boolean isPrimary()
+ {
+ return primary;
+ }
+
+ public void setPrimary(boolean primary)
+ {
+ this.primary = primary;
+ }
+
+}
Added: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPConnection.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPConnection.java?rev=617324&view=auto
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPConnection.java (added)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPConnection.java Thu Jan 31 17:52:27 2008
@@ -0,0 +1,146 @@
+package org.apache.synapse.transport.amqp;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.qpidity.ErrorCode;
+import org.apache.qpidity.nclient.Client;
+import org.apache.qpidity.nclient.ClosedListener;
+import org.apache.qpidity.nclient.Connection;
+
+
+class AMQPConnection implements ClosedListener{
+
+ private static final Log log = LogFactory.getLog(AMQPConnection.class);
+
+ /** Connection name as identified in the axis2.xml */
+ private String name;
+
+ /** The AMQP URL */
+ private String url;
+
+ /** the AMQP connection */
+ private Connection con;
+
+ /** the exchange name to use */
+ private String exchangeName = "amq.direct";
+
+ /** the exchangeType to use */
+ private String exchangeType = "direct";
+
+ /** if connection dropped, reconnect timeout in milliseconds; default 30 seconds */
+ private long reconnectTimeout = 30000;
+
+ public AMQPConnection()
+ {
+ }
+
+ public AMQPConnection(String name, String url, String exchangeName, String exchangeType)
+ {
+ super();
+ this.name = name;
+ this.url = url;
+ this.exchangeName = exchangeName;
+ this.exchangeType = exchangeType;
+ }
+
+ public String getExchangeName()
+ {
+ return exchangeName;
+ }
+
+ public void setExchangeName(String exchangeName)
+ {
+ this.exchangeName = exchangeName;
+ }
+
+ public String getExchangeType()
+ {
+ return exchangeType;
+ }
+
+ public void setExchangeType(String exchangeType)
+ {
+ this.exchangeType = exchangeType;
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+ public void setName(String name)
+ {
+ this.name = name;
+ }
+
+ public String getUrl()
+ {
+ return url;
+ }
+
+ public void setUrl(String url)
+ {
+ this.url = url;
+ }
+
+ public Connection getConnection()
+ {
+ return con;
+ }
+
+ public void setConnection(Connection con)
+ {
+ this.con = con;
+ }
+
+ public long getReconnectTimeout() {
+ return reconnectTimeout;
+ }
+
+ public void setReconnectTimeout(long reconnectTimeout) {
+ this.reconnectTimeout = reconnectTimeout;
+ }
+
+ public void stop(){
+
+ }
+
+ public void start() throws AMQPSynapseException
+ {
+ Connection con = Client.createConnection();
+ try{
+ con.connect(url);
+ }catch(Exception e){
+ throw new AMQPSynapseException("Error creating a connection to the broker",e);
+ }
+ }
+
+ public void onClosed(ErrorCode errorCode, String reason)
+ {
+ log.error("AMQP connection " + name + " encountered an error, Error code:" + errorCode + " reason:" + reason);
+ boolean wasError = true;
+
+ // try to connect
+ // if error occurs wait and try again
+ while (wasError == true) {
+
+ try {
+ // connectAndListen();
+ wasError = false;
+
+ } catch (Exception e1) {
+ log.warn("AMQP reconnection attempt failed for connection : " + name,e1);
+ }
+
+ if (wasError == true) {
+ try {
+ log.info("Attempting reconnection for connection " + name +
+ " in " + getReconnectTimeout()/1000 + " seconds");
+ Thread.sleep(getReconnectTimeout());
+ } catch (InterruptedException ignore) {}
+ }
+ } // wasError
+ }
+
+
+}
Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPConstants.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPConstants.java?rev=617324&r1=617323&r2=617324&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPConstants.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPConstants.java Thu Jan 31 17:52:27 2008
@@ -21,17 +21,31 @@
*/
public static final long DEFAULT_AMQP_TIMEOUT = Options.DEFAULT_TIMEOUT_MILLISECONDS;
- //-------------------------- services.xml parameters --------------------------------
- /**
- * The Parameter name indicating the amqp destination for requests
- */
+ //-------------------------- axis2.xml parameters --------------------------------
+ /** Connection URL specified in the axis2.xml or services.xml */
public static final String CONNECTION_URL_PARAM = "transport.amqp.ConnectionURL";
+ /** default exchange name specified axis2.xml */
public static final String EXCHANGE_NAME_PARAM = "transport.amqp.ExchangeName";
+ /** default exchange type specified axis2.xml */
public static final String EXCHANGE_TYPE_PARAM = "transport.amqp.ExchangeType";
- public static final String ROUTING_KEY_PARAM = "transport.amqp.RoutingKey";
+ //-------------------------- services.xml parameters --------------------------------
+ /** routing key specified in the services.xml */
+ public static final String BINDING_ROUTING_KEY_ATTR = "routingKey";
+
+ /** exchange name specified in the services.xml */
+ public static final String BINDING_EXCHANGE_NAME_ATTR = "exchangeName";
+
+ /** exchange type specified in the services.xml */
+ public static final String BINDING_EXCHANGE_TYPE_ATTR = "exchangeType";
+
+ /** bindings specified in the services.xml */
+ public static final String BINDINGS_PARAM = "transport.amqp.Bindings";
+
+ /** bindings specified in the services.xml */
+ public static final String BINDINGS_PRIMARY_ATTR = "primary";
/**
* The Parameter name indicating the response AMQP destination
@@ -47,7 +61,7 @@
* The Parameter name of an Axis2 service, indicating the AMQP connection
* which should be used to listen for messages for it.
*/
- public static final String CONNECTION_PARAM = "transport.amqp.Connection";
+ public static final String CONNECTION_NAME_PARAM = "transport.amqp.ConnectionName";
/**
* If reconnect timeout if connection error occurs in seconds
*/
Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPListener.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPListener.java?rev=617324&r1=617323&r2=617324&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPListener.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPListener.java Thu Jan 31 17:52:27 2008
@@ -1,22 +1,109 @@
package org.apache.synapse.transport.amqp;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.axiom.om.OMElement;
import org.apache.axis2.AxisFault;
import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.description.AxisService;
+import org.apache.axis2.description.Parameter;
+import org.apache.axis2.description.ParameterIncludeImpl;
+import org.apache.axis2.description.TransportInDescription;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.qpidity.nclient.Client;
+import org.apache.qpidity.nclient.Connection;
import org.apache.synapse.transport.base.AbstractTransportListener;
+import org.apache.synapse.transport.base.BaseUtils;
+import org.apache.synapse.transport.jms.JMSConnectionFactory;
+import org.apache.synapse.transport.jms.JMSConstants;
+import org.apache.synapse.transport.jms.JMSUtils;
public class AMQPListener extends AbstractTransportListener
{
- public static final String TRANSPORT_NAME = "jms";
+ public static final String TRANSPORT_NAME = "AMQP";
private static final Log log = LogFactory.getLog(AMQPListener.class);
+
+ /** A Map containing the AMQP connections managed by this, keyed by name */
+ private Map<String, AMQPConnection> connections = new HashMap<String, AMQPConnection>();
+ /** A Map of service name to the AMQP EPR addresses */
+ private Map serviceNameToEPRMap = new HashMap();
+
+ @Override
+ public void init(ConfigurationContext cfgCtx, TransportInDescription transportIn) throws AxisFault
+ {
+ setTransportName(TRANSPORT_NAME);
+ super.init(cfgCtx, transportIn);
+ loadConnectionDefinitions(transportIn);
+
+ if (connections.isEmpty()) {
+ log.warn("No AMQP connections are defined. Cannot listen on AMQP");
+ return;
+ }
+
+ log.info("AMQP Transport Receiver/Listener initialized...");
+ }
+
+
+
+ @Override
+ public void start() throws AxisFault
+ {
+ for(String conName: connections.keySet()){
+ AMQPConnection conDef = connections.get(conName);
+ conDef.start();
+ }
+ super.start();
+ }
+
+
+ // Need to clean up the sessions as well
+ @Override
+ public void stop() throws AxisFault
+ {
+ for(String conName: connections.keySet()){
+ AMQPConnection connection = connections.get(conName);
+ try{
+ connection.stop();
+ }catch(Exception e){
+ throw new AMQPSynapseException("Error creating a connection to the broker",e);
+ }
+ }
+ super.stop();
+ }
+
+
@Override
protected void startListeningForService(AxisService service)
{
- // TODO Auto-generated method stub
+ if (service.getName().startsWith("__")) {
+ return;
+ }
+
+ AMQPConnection con = getConnectionFactory(service);
+ if (con == null) {
+ String msg = "Service " + service.getName() + " does not specify" +
+ "an AMQP connection or refers to an invalid connection. " +
+ "This service is being marked as faulty and will not be " +
+ "available over the AMQP transport";
+ log.warn(msg);
+ BaseUtils.markServiceAsFaulty(service.getName(), msg, service.getAxisConfiguration());
+ return;
+ }
+
+ // compute service EPR and keep for later use
+ List<AMQPBinding> bindings = AMQPUtils.getBindingsForService(service);
+ serviceNameToEPRMap.put(service.getName(), URIParser.getEPR(bindings,con.getUrl()));
+
+ log.info("Starting to listen for service " + service.getName());
+
+ // create bindings for the service
}
@Override
@@ -26,10 +113,95 @@
}
- public EndpointReference[] getEPRsForService(String arg0, String arg1) throws AxisFault
+ public EndpointReference[] getEPRsForService(String serviceName, String ip) throws AxisFault
{
- // TODO Auto-generated method stub
- return null;
+ //Strip out the operation name
+ if (serviceName.indexOf('/') != -1) {
+ serviceName = serviceName.substring(0, serviceName.indexOf('/'));
+ }
+ return new EndpointReference[]{
+ new EndpointReference((String) serviceNameToEPRMap.get(serviceName))};
}
+ /**
+ * Create an AMQP Connection instances for the definitions in the transport listener,
+ * and add these map keyed by name
+ *
+ * @param transprtIn the transport-in description for AMQP
+ */
+ private void loadConnectionDefinitions(TransportInDescription transprtIn) {
+
+ // iterate through all defined connection definitions
+ Iterator conIter = transprtIn.getParameters().iterator();
+
+ while (conIter.hasNext()) {
+ Parameter conParams = (Parameter) conIter.next();
+
+ ParameterIncludeImpl pi = new ParameterIncludeImpl();
+ AMQPConnection conDef = new AMQPConnection();
+ try {
+ pi.deserializeParameters((OMElement) conParams.getValue());
+ } catch (AxisFault axisFault) {
+ log.error("Error reading parameters for AMQP Connection definitions" +
+ conParams.getName(), axisFault);
+ }
+ conDef.setName((String)conParams.getValue());
+
+ Iterator params = pi.getParameters().iterator();
+ while (params.hasNext()) {
+
+ Parameter p = (Parameter) params.next();
+
+ if (AMQPConstants.CONNECTION_URL_PARAM.equals(p.getName())) {
+ conDef.setUrl((String) p.getValue());
+ }
+ else if (AMQPConstants.EXCHANGE_NAME_PARAM.equals(p.getName())) {
+ conDef.setExchangeName((String) p.getValue());
+ }
+ else if (AMQPConstants.EXCHANGE_TYPE_PARAM.equals(p.getName())) {
+ conDef.setExchangeType((String) p.getValue());
+ }
+ }
+
+ connections.put(conDef.getName(), conDef);
+ }
+ }
+
+ /**
+ * Return the connection for this service. If this service
+ * refers to an invalid connection or defaults to a non-existent default
+ * connection, this returns null
+ *
+ * @param service the AxisService
+ * @return the AMQPConnection to be used, or null if reference is invalid
+ */
+ private AMQPConnection getConnectionFactory(AxisService service) {
+ Parameter conNameParam = service.getParameter(AMQPConstants.CONNECTION_NAME_PARAM);
+ Parameter conURLParam = service.getParameter(AMQPConstants.CONNECTION_URL_PARAM);
+
+ // validate connection factory name (specified or default)
+ if (conNameParam != null) {
+ String conFac = (String) conNameParam.getValue();
+ if (connections.containsKey(conFac)) {
+ return (AMQPConnection) connections.get(conFac);
+ } else {
+ return null;
+ }
+
+ // Next see if service defines it's own connection
+ }else if (conURLParam != null){
+ AMQPConnection con = new AMQPConnection();
+ con.setUrl((String)conURLParam.getValue());
+ con.start();
+ connections.put(service.getName(), con);
+ return con;
+
+ // Next see if there is a default defined in axis2.xml
+ }else if (connections.containsKey(AMQPConstants.DEFAULT_CONNECTION)) {
+ return (AMQPConnection) connections.get(AMQPConstants.DEFAULT_CONNECTION);
+
+ } else {
+ return null;
+ }
+ }
}
Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPOutTransportInfo.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPOutTransportInfo.java?rev=617324&r1=617323&r2=617324&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPOutTransportInfo.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPOutTransportInfo.java Thu Jan 31 17:52:27 2008
@@ -24,9 +24,13 @@
} else {
Map props = URIParser.parse(address);
conURL = (String)props.get(AMQPConstants.CONNECTION_URL_PARAM);
- routingKey = (String)props.get(AMQPConstants.ROUTING_KEY_PARAM);
+ routingKey = (String)props.get(AMQPConstants.BINDING_ROUTING_KEY_ATTR);
exchangeName = (String)props.get(AMQPConstants.EXCHANGE_NAME_PARAM);
}
+ }
+
+ public String getAddress(){
+ return address;
}
public String getConnectionURL(){
Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPSender.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPSender.java?rev=617324&r1=617323&r2=617324&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPSender.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPSender.java Thu Jan 31 17:52:27 2008
@@ -60,8 +60,6 @@
// If targetEPR is not null, determine the addressing info from it
if (targetEPR != null) {
amqpTransportInfo = new AMQPOutTransportInfo(targetEPR);
- // do we have a definition for a connection factory to use for this address?
-
}
// If not try to get the addressing info from the transport description
else if (outTransportInfo != null && outTransportInfo instanceof AMQPOutTransportInfo) {
@@ -86,7 +84,6 @@
}
byte[] message = null;
- String correlationId = null;
try {
message = createMessageData(msgCtx);
} catch (AMQPSynapseException e) {
@@ -119,22 +116,6 @@
deliveryProps.setExchange(amqpTransportInfo.getExchangeName());
deliveryProps.setRoutingKey(amqpTransportInfo.getRoutingKey());
- /* For efficiency I assume that the reply to exchange and destination is already created
- * If the reply is for the same service, then this should be the queue that the service is listening to
- * Blindly creating these exchanges,queues and bindings is sub optimal and can be avoid if the administrator
- * creates the nessacery exchanges,queues and bindings before hand.
- *
- * If the service hasn't specify and it's a request/reply MEP then a temporary queue
- * (which is auto-deleted) is created and bound to the amq.direct exchange.
- */
- if (msgCtx.getProperty(AMQPConstants.AMQP_REPLY_TO_EXCHANGE_NAME) != null){
- String replyExchangeName = (String) msgCtx.getProperty(AMQPConstants.AMQP_REPLY_TO_EXCHANGE_NAME);
- String replyRoutingKey = msgCtx.getProperty(AMQPConstants.AMQP_REPLY_TO_ROUTING_KEY)!= null?(String) msgCtx.getProperty(AMQPConstants.AMQP_REPLY_TO_ROUTING_KEY):null;
-
- // for fannout exchange or some other custom exchange, the routing key maybe null
- msgProps.setReplyTo(new ReplyTo(replyExchangeName,replyRoutingKey));
- }
-
// Content type
OMOutputFormat format = BaseUtils.getOMOutputFormat(msgCtx);
MessageFormatter messageFormatter = null;
@@ -149,10 +130,10 @@
msgProps.setContentType(contentType);
// Custom properties - SOAP ACTION
- Map<String,Object> props = new HashMap();
+ Map<String,Object> props = new HashMap<String,Object>();
if (msgCtx.isServerSide()) {
- // set SOAP Action as a property on the JMS message
+ // set SOAP Action as a property on the message
props.put(BaseConstants.SOAPACTION,(String)msgCtx.getProperty(BaseConstants.SOAPACTION));
} else {
@@ -175,6 +156,7 @@
if (AMQPConstants.AMQP_CORELATION_ID.equals(name)) {
msgProps.setCorrelationId((String) headerMap.get(AMQPConstants.AMQP_CORELATION_ID));
+ // If it's request/response, then we need to fill in corelation id and reply to properties
}
else if (AMQPConstants.AMQP_DELIVERY_MODE.equals(name)) {
Object o = headerMap.get(AMQPConstants.AMQP_DELIVERY_MODE);
@@ -213,20 +195,46 @@
}
}
}
- // If it's request/response, then we need to fill in corelation id and reply to properties
- if (waitForResponse && msgCtx.getProperty(AMQPConstants.AMQP_CORELATION_ID) == null) {
- msgProps.setCorrelationId(UUIDGenerator.getUUID());
- if (msgProps.getReplyTo() == null){
- // We need to use a temp queue here.
- String tempQueueName = "Queue_" + msgProps.getCorrelationId();
- synchronized(session){
- session.queueDeclare(tempQueueName, null, null, Option.AUTO_DELETE,Option.EXCLUSIVE);
- session.queueBind(tempQueueName, "amq.direct", tempQueueName, null);
- session.sync();
- }
- msgProps.replyTo(new ReplyTo("amq.direct",tempQueueName));
- }
- }
+
+ /* For efficiency I assume that the reply to exchange and destination is already created
+ * If the reply is for the same service, then this should be the queue that the service is listening to.
+ * Blindly creating these exchanges,queues and bindings is sub optimal and can be avoid if the administrator
+ * creates the nessacery exchanges,queues and bindings before hand.
+ *
+ * If the service hasn't specify and it's a request/reply MEP then a temporary queue
+ * (which is auto-deleted) is created and bound to the amq.direct exchange.
+ */
+ if (msgCtx.getProperty(AMQPConstants.AMQP_REPLY_TO_EXCHANGE_NAME) != null){
+ String replyExchangeName = (String) msgCtx.getProperty(AMQPConstants.AMQP_REPLY_TO_EXCHANGE_NAME);
+ String replyRoutingKey = msgCtx.getProperty(AMQPConstants.AMQP_REPLY_TO_ROUTING_KEY)!= null?(String) msgCtx.getProperty(AMQPConstants.AMQP_REPLY_TO_ROUTING_KEY):null;
+
+ // for fannout exchange or some other custom exchange, the routing key maybe null
+ msgProps.setReplyTo(new ReplyTo(replyExchangeName,replyRoutingKey));
+ }
+
+ // If it's request/response, then we need to fill in reply to properties and correlation_id
+ if (waitForResponse){
+
+ if (waitForResponse && msgProps.getCorrelationId() == null) {
+ if (msgCtx.getProperty(AMQPConstants.AMQP_CORELATION_ID) != null){
+ msgProps.setCorrelationId((String)msgCtx.getProperty(AMQPConstants.AMQP_CORELATION_ID));
+ }else{
+ msgProps.setCorrelationId(UUIDGenerator.getUUID());
+ }
+
+ }
+
+ if (msgProps.getReplyTo() == null){
+ //We need to use a temp queue here.
+ String tempQueueName = "Queue_" + msgProps.getCorrelationId();
+ synchronized(session){
+ session.queueDeclare(tempQueueName, null, null, Option.AUTO_DELETE,Option.EXCLUSIVE);
+ session.queueBind(tempQueueName, "amq.direct", tempQueueName, null);
+ session.sync();
+ }
+ msgProps.replyTo(new ReplyTo("amq.direct",tempQueueName));
+ }
+ }
}
private byte[] createMessageData(MessageContext msgContext){
@@ -265,7 +273,7 @@
session.messageSubscribe(msgProps.getReplyTo().getRoutingKey(),
destination,
Session.TRANSFER_CONFIRM_MODE_REQUIRED,
- Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE,
+ Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE,
new MessagePartListenerAdapter(listener), null, Option.NO_OPTION);
Message reply = listener.receive(timeout);
Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPUtils.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPUtils.java?rev=617324&r1=617323&r2=617324&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPUtils.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/AMQPUtils.java Thu Jan 31 17:52:27 2008
@@ -3,19 +3,25 @@
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
-import java.util.Enumeration;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
import java.util.Map;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Queue;
-import javax.jms.Topic;
+import javax.xml.namespace.QName;
+import org.apache.axiom.om.OMAttribute;
+import org.apache.axiom.om.OMElement;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.description.AxisService;
+import org.apache.axis2.description.Parameter;
+import org.apache.axis2.description.ParameterIncludeImpl;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.qpidity.api.Message;
import org.apache.synapse.transport.base.BaseUtils;
+import org.apache.synapse.transport.jms.JMSConnectionFactory;
import org.apache.synapse.transport.jms.JMSConstants;
public class AMQPUtils extends BaseUtils
@@ -85,7 +91,7 @@
*/
public static Map getTransportHeaders(Message message) {
// create a Map to hold transport headers
- Map map = new HashMap();
+ Map<String,Object> map = new HashMap<String,Object>();
// correlation ID
if (message.getMessageProperties().getCorrelationId() != null) {
@@ -131,4 +137,53 @@
return map;
}
+
+ /**
+ * Get the AMQP destination used by this service
+ *
+ * @param service the Axis Service
+ * @return the name of the JMS destination
+ */
+ public static List<AMQPBinding> getBindingsForService(AxisService service) {
+ Parameter bindingsParam = service.getParameter(AMQPConstants.BINDINGS_PARAM);
+ ParameterIncludeImpl pi = new ParameterIncludeImpl();
+ try {
+ pi.deserializeParameters((OMElement) bindingsParam.getValue());
+ } catch (AxisFault axisFault) {
+ log.error("Error reading parameters for AMQP binding definitions" +
+ bindingsParam.getName(), axisFault);
+ }
+
+ Iterator params = pi.getParameters().iterator();
+ ArrayList<AMQPBinding> list = new ArrayList<AMQPBinding>();
+ if(params.hasNext())
+ {
+ while (params.hasNext())
+ {
+ Parameter p = (Parameter) params.next();
+ AMQPBinding binding = new AMQPBinding();
+ OMAttribute exchangeTypeAttr = p.getParameterElement().getAttribute(new QName(AMQPConstants.BINDING_EXCHANGE_TYPE_ATTR));
+ OMAttribute exchangeNameAttr = p.getParameterElement().getAttribute(new QName(AMQPConstants.BINDING_EXCHANGE_NAME_ATTR));
+ OMAttribute routingKeyAttr = p.getParameterElement().getAttribute(new QName(AMQPConstants.BINDING_ROUTING_KEY_ATTR));
+ OMAttribute primaryAttr = p.getParameterElement().getAttribute(new QName(AMQPConstants.BINDINGS_PRIMARY_ATTR));
+
+ if ( exchangeTypeAttr != null) {
+ binding.setExchangeType(exchangeTypeAttr.getAttributeValue());
+ }else if ( exchangeNameAttr != null) {
+ binding.setExchangeName(exchangeNameAttr.getAttributeValue());
+ }else if ( primaryAttr != null) {
+ binding.setPrimary(true);
+ }
+ list.add(binding);
+ }
+ }else{
+ // go for the defaults
+ AMQPBinding binding = new AMQPBinding();
+ binding.setRoutingKey(service.getName());
+ list.add(binding);
+ }
+
+ return list;
+ }
+
}
Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/URIParser.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/URIParser.java?rev=617324&r1=617323&r2=617324&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/URIParser.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/amqp/URIParser.java Thu Jan 31 17:52:27 2008
@@ -1,31 +1,78 @@
package org.apache.synapse.transport.amqp;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
/** sample uri formats - this is temporary until the AMQP WG defines a proper addressing scheme
*
-* uri="amqp:/direct?transport.amqp.RoutingKey=SimpleStockQuoteService&transport.amqp.ConnectionURL=qpid:virtualhost=test;client_id=foo@tcp:myhost.com:5672"
-* amqp:/topic?transport.amqp.RoutingKey=weather.us.ny&transport.amqp.ConnectionURL=qpid:virtualhost=test;client_id=foo@tcp:myhost.com:5672
+* uri="amqp:/direct/amq.direct?transport.amqp.RoutingKey=SimpleStockQuoteService&transport.amqp.ConnectionURL=qpid:virtualhost=test;client_id=foo@tcp:myhost.com:5672"
+* amqp:/topic/amq.topic?transport.amqp.RoutingKey=weather.us.ny&transport.amqp.ConnectionURL=qpid:virtualhost=test;client_id=foo@tcp:myhost.com:5672
*/
public class URIParser
{
public static Map parse(String uri){
Map props = new HashMap();
- String dest = uri.substring(6,uri.indexOf("?"));
- if (dest == null || dest.trim().equals("")){
- throw new IllegalArgumentException("destination cannot be null");
+ String temp = uri.substring(6,uri.indexOf("?"));
+ String exchangeType = temp.substring(0,temp.indexOf("/"));
+ String exchangeName = temp.substring(temp.indexOf("/"),temp.length());
+ if (exchangeType == null || exchangeType.trim().equals("")){
+ throw new IllegalArgumentException("exchange type cannot be null");
}
- props.put(AMQPConstants.EXCHANGE_NAME_PARAM, dest);
+ if (exchangeType == null || exchangeType.trim().equals("")){
+ throw new IllegalArgumentException("exchange name cannot be null");
+ }
+ props.put(AMQPConstants.EXCHANGE_NAME_PARAM, exchangeName);
+ props.put(AMQPConstants.EXCHANGE_TYPE_PARAM, exchangeType);
String paramStr = uri.substring(uri.indexOf("?")+1,uri.length());
String[] params = paramStr.split("&");
for (String param:params){
String key = param.substring(0,param.indexOf("="));
String value = param.substring(param.indexOf("=")+1,param.length());
+ if("connectionURL".equals(key)){
+ key = AMQPConstants.CONNECTION_URL_PARAM;
+ }
props.put(key, value);
}
return props;
+ }
+
+ /**
+ * Get the EPR for the given AMQP details
+ * the form of the URL is
+ * uri="amqp:/direct?routingKey=SimpleStockQuoteService&connectionURL=qpid:virtualhost=test;client_id=foo@tcp:myhost.com:5672"
+ *
+ * Creates the EPR with the primary binding
+ *
+ */
+ public static String getEPR(List<AMQPBinding> list, String url) {
+
+ String epr = null;
+ for (AMQPBinding binding:list){
+
+ if (binding.isPrimary()){
+ StringBuffer sb = new StringBuffer();
+ sb.append(AMQPConstants.AMQP_PREFIX).append("/").append(binding.getExchangeType());
+ sb.append("/").append(binding.getExchangeName());
+ sb.append("?").append(AMQPConstants.BINDING_ROUTING_KEY_ATTR).append("=").append(binding.getRoutingKey());
+ sb.append("&").append("connectionURL=").append(url);
+ epr = sb.toString();
+ }
+ }
+
+ // If no primary is defined just get the first
+ if(epr == null){
+ AMQPBinding binding = list.get(0);
+ StringBuffer sb = new StringBuffer();
+ sb.append(AMQPConstants.AMQP_PREFIX).append("/").append(binding.getExchangeType());
+ sb.append("/").append(binding.getExchangeName());
+ sb.append("?").append(AMQPConstants.BINDING_ROUTING_KEY_ATTR).append("=").append(binding.getRoutingKey());
+ sb.append("&").append("connectionURL=").append(url);
+ epr = sb.toString();
+ }
+
+ return epr;
}
public static void main(String[] args){