You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2007/07/24 02:35:27 UTC

svn commit: r558903 - in /incubator/qpid/branches/client_restructure/java: broker/ broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/exchange/ broker/src/main/java/org/apache/qpid/server/exchange/synapse/ broker/s...

Author: rajith
Date: Mon Jul 23 17:35:26 2007
New Revision: 558903

URL: http://svn.apache.org/viewvc?view=rev&rev=558903
Log:
adding synapse exchange

Added:
    incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/
    incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/MessageContextCreatorForQpid.java
    incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/QpidSynapseEnvironment.java
    incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/SynapseExchange.java
    incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/TestClassMediator.java
Modified:
    incubator/qpid/branches/client_restructure/java/broker/pom.xml
    incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
    incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
    incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
    incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
    incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java
    incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
    incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java
    incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/QpidTestClient.java
    incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConstants.java
    incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java
    incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageConsumerImpl.java
    incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageProducerImpl.java

Modified: incubator/qpid/branches/client_restructure/java/broker/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/broker/pom.xml?view=diff&rev=558903&r1=558902&r2=558903
==============================================================================
--- incubator/qpid/branches/client_restructure/java/broker/pom.xml (original)
+++ incubator/qpid/branches/client_restructure/java/broker/pom.xml Mon Jul 23 17:35:26 2007
@@ -34,14 +34,89 @@
 
     <properties>
         <topDirectoryLocation>..</topDirectoryLocation>
+        <!-- Synapse and related components -->
+        <synapse.version>1.0</synapse.version>
+        <stax.api.version>1.0.1</stax.api.version>
+        <activation.version>1.1</activation.version>
+   
+        <!-- Axis2 1.2 and its dependencies -->
+        <axis2.version>1.2</axis2.version>
+        <axiom.version>1.2.4</axiom.version>
+        <xml_schema.version>1.3.1</xml_schema.version>
+        <xml_apis.version>1.3.03</xml_apis.version>
     </properties>
 
     <dependencies>
+        
+        <dependency>
+            <groupId>org.apache.axis2</groupId>
+            <artifactId>axis2-kernel</artifactId>
+            <version>${axis2.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.synapse</groupId>
+            <artifactId>synapse-core</artifactId>
+            <version>${synapse.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.synapse</groupId>
+            <artifactId>synapse-extensions</artifactId>
+            <version>${synapse.version}</version>
+        </dependency>
+            
+        <dependency>
+            <groupId>org.apache.ws.commons.axiom</groupId>
+            <artifactId>axiom-api</artifactId>
+            <version>${axiom.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.ws.commons.axiom</groupId>
+            <artifactId>axiom-impl</artifactId>
+            <version>${axiom.version}</version>
+        </dependency>
+      <!--  <dependency>
+            <groupId>org.apache.ws.commons.axiom</groupId>
+            <artifactId>axiom-dom</artifactId>
+            <version>${axiom.version}</version>
+        </dependency>
+     -->
+
+        <dependency>
+            <groupId>org.apache.ws.commons.schema</groupId>
+            <artifactId>XmlSchema</artifactId>
+            <version>${xml_schema.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>xml-apis</groupId>
+            <artifactId>xml-apis</artifactId>
+            <version>${xml_apis.version}</version>
+        </dependency>
+        
+        <dependency>
+	   <groupId>org.codehaus.woodstox</groupId>
+	   <artifactId>wstx-asl</artifactId>
+           <version>3.2.1</version>
+	</dependency>
+
+        <dependency>
+            <groupId>stax</groupId>
+            <artifactId>stax-api</artifactId>
+            <version>${stax.api.version}</version>
+        </dependency>       
+ 
+        <dependency>
+            <groupId>javax.activation</groupId>
+            <artifactId>activation</artifactId>
+            <version>${activation.version}</version>
+        </dependency>
 
         <dependency>
             <groupId>org.apache.qpid</groupId>
             <artifactId>qpid-common</artifactId>
-        </dependency>
+        </dependency>        
 
         <dependency>
             <groupId>commons-cli</groupId>

Modified: incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java?view=diff&rev=558903&r1=558902&r2=558903
==============================================================================
--- incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java (original)
+++ incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java Mon Jul 23 17:35:26 2007
@@ -94,7 +94,7 @@
                 Exchange exchange = _exchangeRegistry.getExchange(new AMQShortString(exchangeName));
                 if (exchange == null)
                 {
-                    exchange = _exchangeFactory.createExchange(new AMQShortString(exchangeName), new AMQShortString(type), durable, autoDelete, 0);
+                    exchange = _exchangeFactory.createExchange(_exchangeRegistry,new AMQShortString(exchangeName), new AMQShortString(type), durable, autoDelete, 0);
                     _exchangeRegistry.registerExchange(exchange);
                 }
                 else

Modified: incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?view=diff&rev=558903&r1=558902&r2=558903
==============================================================================
--- incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Mon Jul 23 17:35:26 2007
@@ -126,7 +126,7 @@
      */
     protected abstract ExchangeMBean createMBean() throws AMQException;
 
-    public void initialise(VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException
+	public void initialise(VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete, ExchangeRegistry exchangeRegistry) throws AMQException
     {
         _virtualHost = host;
         _name = name;
@@ -134,7 +134,10 @@
         _autoDelete = autoDelete;
         _ticket = ticket;
         _exchangeMbean = createMBean();
-        _exchangeMbean.register();
+        if(_exchangeMbean != null)
+        {
+        	_exchangeMbean.register();
+        }
     }
 
     public boolean isDurable()

Modified: incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java?view=diff&rev=558903&r1=558902&r2=558903
==============================================================================
--- incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java (original)
+++ incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java Mon Jul 23 17:35:26 2007
@@ -20,19 +20,15 @@
  */
 package org.apache.qpid.server.exchange;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQChannelException;
 import org.apache.qpid.AMQUnknownExchangeType;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.qpid.AMQUnknownExchangeType;
-import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.server.virtualhost.VirtualHost;
 
 public class DefaultExchangeFactory implements ExchangeFactory
 {
@@ -48,9 +44,12 @@
         _exchangeClassMap.put(ExchangeDefaults.TOPIC_EXCHANGE_CLASS, org.apache.qpid.server.exchange.DestWildExchange.class);
         _exchangeClassMap.put(ExchangeDefaults.HEADERS_EXCHANGE_CLASS, org.apache.qpid.server.exchange.HeadersExchange.class);
         _exchangeClassMap.put(ExchangeDefaults.FANOUT_EXCHANGE_CLASS, org.apache.qpid.server.exchange.FanoutExchange.class);
+        
+        // I'd rather allow an extention mechanism to register custom exchanges. for standard default exchanges this is fine.
+        _exchangeClassMap.put(new AMQShortString("synapse"), org.apache.qpid.server.exchange.synapse.SynapseExchange.class);
     }
 
-    public Exchange createExchange(AMQShortString exchange, AMQShortString type, boolean durable, boolean autoDelete,
+    public Exchange createExchange(ExchangeRegistry exchangeRegistry,AMQShortString exchange, AMQShortString type, boolean durable, boolean autoDelete,
                                    int ticket)
             throws AMQException
     {
@@ -62,7 +61,7 @@
         try
         {
             Exchange e = exchClass.newInstance();
-            e.initialise(_host, exchange, durable, ticket, autoDelete);
+            e.initialise(_host, exchange, durable, ticket, autoDelete, exchangeRegistry);
             return e;
         }
         catch (InstantiationException e)

Modified: incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java?view=diff&rev=558903&r1=558902&r2=558903
==============================================================================
--- incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java (original)
+++ incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java Mon Jul 23 17:35:26 2007
@@ -32,7 +32,7 @@
     AMQShortString getName();
     AMQShortString getType();
 
-    void initialise(VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException;
+    void initialise(VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete, ExchangeRegistry exchangeRegistry) throws AMQException;
 
     boolean isDurable();
 

Modified: incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java?view=diff&rev=558903&r1=558902&r2=558903
==============================================================================
--- incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java (original)
+++ incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java Mon Jul 23 17:35:26 2007
@@ -26,7 +26,7 @@
 
 public interface ExchangeFactory
 {
-    Exchange createExchange(AMQShortString exchange, AMQShortString type, boolean durable, boolean autoDelete,
+    Exchange createExchange(ExchangeRegistry exchangeRegistry, AMQShortString exchange, AMQShortString type, boolean durable, boolean autoDelete,
                             int ticket)
             throws AMQException;
 }

Added: incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/MessageContextCreatorForQpid.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/MessageContextCreatorForQpid.java?view=auto&rev=558903
==============================================================================
--- incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/MessageContextCreatorForQpid.java (added)
+++ incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/MessageContextCreatorForQpid.java Mon Jul 23 17:35:26 2007
@@ -0,0 +1,258 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.qpid.server.exchange.synapse;
+
+import java.io.ByteArrayInputStream;
+import java.io.FileInputStream;
+
+import javax.activation.DataHandler;
+import javax.xml.namespace.QName;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+
+import org.apache.axiom.attachments.ByteArrayDataSource;
+import org.apache.axiom.om.OMDocument;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMText;
+import org.apache.axiom.om.impl.builder.StAXOMBuilder;
+import org.apache.axiom.om.util.StAXUtils;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axiom.soap.SOAPFactory;
+import org.apache.axiom.soap.impl.llom.soap11.SOAP11Factory;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.addressing.RelatesTo;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.Content;
+import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.SynapseException;
+import org.apache.synapse.config.SynapseConfiguration;
+import org.apache.synapse.core.SynapseEnvironment;
+import org.apache.synapse.core.axis2.Axis2MessageContext;
+
+/**
+ * The MessageContext needs to be set up and then is used by the SynapseMessageReceiver to inject messages.
+ * This class is used by the SynapseMessageReceiver to find the environment. The env is stored in a Parameter to the Axis2 config
+ */
+public class MessageContextCreatorForQpid{
+
+    private static Log log = LogFactory.getLog(MessageContextCreatorForQpid.class);
+
+    private static SynapseConfiguration synCfg = null;
+    private static SynapseEnvironment   synEnv = null;
+
+    final static String ORIGINAL_MESSAGE = "ORIGINAL_MESSAGE";
+    final static String AMQP_CONTENT_TYPE = "AMQP_CONTENT_TYPE";  
+    final static String DEFAULT_CHAR_SET_ENCODING = "UTF-8";
+    
+    enum ContentType 
+    {
+    	TEXT_PLAIN ("text/plain"),
+    	TEXT_XML ("text/xml"),
+    	APPLICATION_OCTECT ("application/octet-stream");
+    	
+    	private final String _value;
+    	
+    	private ContentType (String value)
+    	{
+    		_value = value;
+    	}
+    	
+    	public String value()
+    	{
+    		return _value;
+    	}
+    }
+    
+    private static String createURL(String exchangeName,String routingKey)
+    {
+    	StringBuffer buf = new StringBuffer();
+    	buf.append("amqp://");
+    	buf.append(exchangeName);
+    	buf.append("?");
+    	buf.append("routingKey=");
+    	buf.append(routingKey);
+    	
+    	return buf.toString();
+    }
+    
+    public static MessageContext getSynapseMessageContext(AMQMessage amqMsg) throws SynapseException {
+
+        if (synCfg == null || synEnv == null) {
+            String msg = "Synapse environment has not initialized properly..";
+            log.fatal(msg);
+            throw new SynapseException(msg);
+        }
+        
+        org.apache.axis2.context.MessageContext axis2MC = new org.apache.axis2.context.MessageContext();        
+        Axis2MessageContext synCtx = new Axis2MessageContext(axis2MC, synCfg, synEnv);
+        synCtx.setMessageID(amqMsg.getTransferBody().getMessageId().asString());
+        if(amqMsg.getTransferBody().getCorrelationId() != null)
+        {
+        	synCtx.setRelatesTo(new RelatesTo[]{new RelatesTo(amqMsg.getTransferBody().getCorrelationId().asString())});
+        }
+        synCtx.setTo(new EndpointReference(createURL(amqMsg.getTransferBody().getExchange().asString(),amqMsg.getTransferBody().getRoutingKey().asString())));
+        
+        if(amqMsg.getTransferBody().getReplyTo() != null)
+        {
+        	synCtx.setReplyTo(new EndpointReference(createURL(amqMsg.getTransferBody().getExchange().asString(),amqMsg.getTransferBody().getReplyTo().asString())));       
+    	}
+        synCtx.setDoingPOX(true);
+        synCtx.setProperty(ORIGINAL_MESSAGE, amqMsg);
+        
+        //Creating a fictitious SOAP envelope to support the synapse model
+        
+        SOAPFactory soapFactory = new SOAP11Factory();
+        SOAPEnvelope envelope = soapFactory.getDefaultEnvelope();
+        
+        String contentType = amqMsg.getTransferBody().getContentType().asString();
+        if(ContentType.TEXT_PLAIN.value().equals(contentType))
+        {
+        	OMElement wrapper = soapFactory.createOMElement(new QName("payload"), null);
+        	OMText textData = soapFactory.createOMText(amqMsg.getTransferBody().getBody().getContentAsString());
+            wrapper.addChild(textData);
+            envelope.getBody().addChild(wrapper);
+        }
+        else if (ContentType.TEXT_XML.value().equals(contentType))
+        {
+        	XMLStreamReader parser;
+			try
+			{
+				parser = StAXUtils.createXMLStreamReader(
+						 new ByteArrayInputStream(amqMsg.getTransferBody().getBody().getContentAsByteArray()),
+						 DEFAULT_CHAR_SET_ENCODING);
+			}
+			catch (XMLStreamException e)
+			{
+				throw new SynapseException("Error reading the XML message",e);				
+			}
+        	
+        	StAXOMBuilder builder = new StAXOMBuilder(parser);
+        	//builder.setOMBuilderFactory(soapFactory);
+        	
+        	Object obj = builder.getDocumentElement();
+        	envelope.getBody().addChild(builder.getDocumentElement());
+        }
+        else if (ContentType.APPLICATION_OCTECT.value().equals(contentType))
+        {
+        	// treat binary data as an attachment
+        	DataHandler dataHandler = new DataHandler(
+                    new ByteArrayDataSource(amqMsg.getTransferBody().getBody().getContentAsByteArray()));
+                OMText textData = soapFactory.createOMText(dataHandler, true);
+                OMElement wrapper = soapFactory.createOMElement(new QName("payload"), null);
+                wrapper.addChild(textData);
+                synCtx.setDoingMTOM(true);
+                
+                envelope.getBody().addChild(wrapper);
+        }
+        else
+        {
+        	throw new SynapseException("Unsupported Content Type : " + contentType);
+        }
+        
+        synCtx.setProperty(AMQP_CONTENT_TYPE, contentType);
+                
+        try
+        {
+        	synCtx.setEnvelope(envelope);
+        }
+        catch(AxisFault e)
+        {        
+        	throw new SynapseException(e);
+        }
+        	
+        return synCtx;
+    }    
+    
+    public static AMQMessage getAMQMessage(MessageContext mc)
+    {
+    	AMQMessage origMsg = (AMQMessage)mc.getProperty(ORIGINAL_MESSAGE);
+    	OMElement payload = mc.getEnvelope().getBody().getFirstElement();
+    	
+    	String amqContentType = (String)mc.getProperty(AMQP_CONTENT_TYPE);
+    	byte[] content = new byte[0];
+    	
+    	if(ContentType.TEXT_PLAIN.value().equals(amqContentType))
+    	{
+    		// For plain text there was a wrapper element
+    		content = payload.getText().getBytes();
+    	}
+    	else if (ContentType.TEXT_XML.value().equals(amqContentType))
+    	{
+    		content = payload.getText().getBytes();
+    	}
+    	else if (ContentType.APPLICATION_OCTECT.value().equals(amqContentType) && mc.isDoingMTOM())
+    	{
+    		
+    	}
+    	
+    	String url = mc.getTo().getAddress();;
+		// very crude
+		// should have utility class to do this, but do it when amqp
+		// officialy converge on an addressing scheme
+		String exchangeName = url.substring(7,url.indexOf('?'));
+		String routingKey = url.substring(url.indexOf('=')+1,url.length());
+		
+    	
+    	MessageTransferBody origTransferBody = origMsg.getTransferBody();
+    	MessageTransferBody transferBody = MessageTransferBody.createMethodBody(
+    			origTransferBody.getMajor(), 
+    			origTransferBody.getMinor(),
+    			origTransferBody.getAppId(), //appId
+    			origTransferBody.getApplicationHeaders(), //applicationHeaders
+				new Content(Content.TypeEnum.INLINE_T, content), //body
+				origTransferBody.getContentType(), //contentEncoding, 
+				origTransferBody.getContentType(), //contentType
+				origTransferBody.getCorrelationId(), //correlationId
+				origTransferBody.getDeliveryMode(), //deliveryMode non persistant
+				new AMQShortString(exchangeName),// destination
+				new AMQShortString(exchangeName),// exchange
+				origTransferBody.getExpiration(), //expiration
+				origTransferBody.getImmediate(), //immediate
+				origTransferBody.getMandatory(), //mandatory
+				origTransferBody.getMessageId(), //messageId
+				origTransferBody.getPriority(), //priority
+				origTransferBody.getRedelivered(), //redelivered
+				origTransferBody.getReplyTo(), //replyTo
+				new AMQShortString(routingKey), //routingKey, 
+				"abc".getBytes(), //securityToken
+				origTransferBody.ticket, //ticket
+				System.currentTimeMillis(), //timestamp
+				origTransferBody.getTransactionId(), //transactionId
+				origTransferBody.getTtl(), //ttl, 
+				origTransferBody.getUserId() //userId
+				);
+    	AMQMessage newMsg = new AMQMessage(origMsg.getMessageStore(),transferBody,origMsg.getTransactionContext());
+    	
+    	return newMsg;
+    }
+    
+    public static void setSynConfig(SynapseConfiguration synCfg) {
+        MessageContextCreatorForQpid.synCfg = synCfg;
+    }
+
+    public static void setSynEnv(SynapseEnvironment synEnv) {
+        MessageContextCreatorForQpid.synEnv = synEnv;
+    }
+}

Added: incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/QpidSynapseEnvironment.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/QpidSynapseEnvironment.java?view=auto&rev=558903
==============================================================================
--- incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/QpidSynapseEnvironment.java (added)
+++ incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/QpidSynapseEnvironment.java Mon Jul 23 17:35:26 2007
@@ -0,0 +1,74 @@
+package org.apache.qpid.server.exchange.synapse;
+
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.SynapseException;
+import org.apache.synapse.config.SynapseConfiguration;
+import org.apache.synapse.core.SynapseEnvironment;
+import org.apache.synapse.core.axis2.Axis2MessageContext;
+import org.apache.synapse.endpoints.utils.EndpointDefinition;
+import org.apache.synapse.statistics.StatisticsCollector;
+
+public class QpidSynapseEnvironment implements SynapseEnvironment
+{
+
+	private static final Log log = LogFactory.getLog(QpidSynapseEnvironment.class);
+
+	private SynapseConfiguration synapseConfig;
+
+	private StatisticsCollector statisticsCollector;
+
+	private SynapseExchange qpidExchange;
+	
+	public QpidSynapseEnvironment(SynapseConfiguration synapseConfig, SynapseExchange qpidExchange)
+	{
+		this.synapseConfig = synapseConfig;
+		this.qpidExchange = qpidExchange;
+	}
+
+	public MessageContext createMessageContext()
+	{
+		org.apache.axis2.context.MessageContext axis2MC = new org.apache.axis2.context.MessageContext();
+		MessageContext mc = new Axis2MessageContext(axis2MC, synapseConfig, this);
+		return mc;
+	}
+
+	public StatisticsCollector getStatisticsCollector()
+	{
+		return statisticsCollector;
+	}
+
+	public void injectMessage(MessageContext synCtx)
+	{
+
+		synCtx.getMainSequence().mediate(synCtx);
+	}
+
+	public void send(EndpointDefinition endpoint, MessageContext smc)
+	{
+		if(endpoint != null)
+		{
+			smc.setTo(new EndpointReference(endpoint.getAddress()));
+			AMQMessage newMessage = MessageContextCreatorForQpid.getAMQMessage(smc);
+			try
+			{	
+				qpidExchange.getExchangeRegistry().routeContent(newMessage);
+			}
+			catch(Exception e)
+			{
+				throw new SynapseException("Faulty endpoint",e);
+			}
+		}	
+		
+	}
+
+	public void setStatisticsCollector(StatisticsCollector statisticsCollector)
+	{
+		this.statisticsCollector = statisticsCollector;
+	}
+
+}

Added: incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/SynapseExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/SynapseExchange.java?view=auto&rev=558903
==============================================================================
--- incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/SynapseExchange.java (added)
+++ incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/SynapseExchange.java Mon Jul 23 17:35:26 2007
@@ -0,0 +1,104 @@
+package org.apache.qpid.server.exchange.synapse;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.exchange.AbstractExchange;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.synapse.Constants;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.config.SynapseConfiguration;
+import org.apache.synapse.config.SynapseConfigurationBuilder;
+import org.apache.synapse.core.SynapseEnvironment;
+
+public class SynapseExchange extends AbstractExchange
+{
+
+	public final static AMQShortString TYPE = new AMQShortString("synapse"); 
+	
+	private SynapseEnvironment synEnv;
+	
+	private ExchangeRegistry exchangeRegistry;
+
+	public SynapseExchange()
+	{
+		super();
+	}
+
+	@Override
+	public void initialise(VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete, ExchangeRegistry exchangeRegistry) throws AMQException
+	{
+		super.initialise(host, name, durable, ticket, autoDelete, exchangeRegistry);
+		
+		String config = System.getProperty(Constants.SYNAPSE_XML);
+		SynapseConfiguration synapseConfiguration = SynapseConfigurationBuilder.getConfiguration(config);
+		synEnv = new QpidSynapseEnvironment(synapseConfiguration,this);
+		MessageContextCreatorForQpid.setSynConfig(synapseConfiguration);
+		MessageContextCreatorForQpid.setSynEnv(synEnv);
+		this.exchangeRegistry = exchangeRegistry;
+	}
+
+	@Override
+	protected ExchangeMBean createMBean() throws AMQException
+	{
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	public void deregisterQueue(AMQShortString routingKey, AMQQueue queue) throws AMQException
+	{
+		throw new UnsupportedOperationException("This exchange does not take bindings");
+	}
+
+	public AMQShortString getType()
+	{
+		return TYPE;
+	}
+
+	public boolean hasBindings() throws AMQException
+	{		
+		return false;
+	}
+
+	public boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException
+	{
+		throw new UnsupportedOperationException("This exchange does not take bindings");
+	}
+
+	public boolean isBound(AMQShortString routingKey) throws AMQException
+	{
+		throw new UnsupportedOperationException("This exchange does not take bindings");
+	}
+
+	public boolean isBound(AMQQueue queue) throws AMQException
+	{
+		throw new UnsupportedOperationException("This exchange does not take bindings");
+	}
+
+	public void registerQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+	{
+		throw new UnsupportedOperationException("This exchange does not take bindings");
+	}
+	
+	public void route(AMQMessage message) throws AMQException
+	{
+		try
+		{
+			MessageContext mc = MessageContextCreatorForQpid.getSynapseMessageContext(message);
+			synEnv.injectMessage(mc);			
+		}
+		catch(Exception e)
+		{
+			throw new AMQException("Error occurred while trying to mediate message through Synapse",e);
+		}
+	}
+	
+	public ExchangeRegistry getExchangeRegistry()
+	{
+		return exchangeRegistry;
+	}
+
+}

Added: incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/TestClassMediator.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/TestClassMediator.java?view=auto&rev=558903
==============================================================================
--- incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/TestClassMediator.java (added)
+++ incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/TestClassMediator.java Mon Jul 23 17:35:26 2007
@@ -0,0 +1,62 @@
+package org.apache.qpid.server.exchange.synapse;
+
+import javax.activation.DataHandler;
+import javax.xml.namespace.QName;
+
+import org.apache.axiom.attachments.ByteArrayDataSource;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMText;
+import org.apache.axiom.soap.SOAPFactory;
+import org.apache.axiom.soap.impl.llom.soap11.SOAP11Factory;
+import org.apache.synapse.Mediator;
+import org.apache.synapse.MessageContext;
+
+public class TestClassMediator implements Mediator
+{
+
+	public int getTraceState()
+	{
+		// TODO Auto-generated method stub
+		return 0;
+	}
+
+	public String getType()
+	{
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	public boolean mediate(MessageContext mc)
+	{
+		SOAPFactory soapFactory = new SOAP11Factory();
+		OMElement binaryNode = mc.getEnvelope().getBody().getFirstChildWithName(new QName("payload"));
+		byte[] source = binaryNode.getText().getBytes();
+		
+		byte[] b = new byte[source.length];
+	    int j = 0;
+		for(int i=source.length-1; i>0; i--)
+		{
+			b[j] = source[i];
+			j++;
+		}
+		
+		mc.getEnvelope().getBody().getFirstChildWithName(new QName("payload")).detach();
+		
+		DataHandler dataHandler = new DataHandler(
+                new ByteArrayDataSource(b));
+        OMText textData = soapFactory.createOMText(dataHandler, true);
+        OMElement wrapper = soapFactory.createOMElement(new QName("payload"), null);
+        wrapper.addChild(textData);
+        mc.setDoingMTOM(true);
+        
+        mc.getEnvelope().getBody().addChild(wrapper);
+		return true;
+	}
+
+	public void setTraceState(int arg0)
+	{
+		// TODO Auto-generated method stub
+
+	}
+
+}

Modified: incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java?view=diff&rev=558903&r1=558902&r2=558903
==============================================================================
--- incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java (original)
+++ incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java Mon Jul 23 17:35:26 2007
@@ -79,7 +79,7 @@
                 {
                     try
                     {
-                        exchange = exchangeFactory.createExchange(body.exchange, body.type, body.durable,
+                        exchange = exchangeFactory.createExchange(exchangeRegistry,body.exchange, body.type, body.durable,
                                                                   body.passive, body.ticket);
                         exchangeRegistry.registerExchange(exchange);
                     }

Modified: incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java?view=diff&rev=558903&r1=558902&r2=558903
==============================================================================
--- incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java (original)
+++ incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java Mon Jul 23 17:35:26 2007
@@ -34,12 +34,15 @@
         define(registry, factory, ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
         define(registry, factory, ExchangeDefaults.FANOUT_EXCHANGE_NAME, ExchangeDefaults.FANOUT_EXCHANGE_CLASS);
 
+        //There should be an extention mechanism to register
+        define(registry,factory,new AMQShortString("amq.synapse"),new AMQShortString("synapse"));
+        
         registry.setDefaultExchange(registry.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME));
     }
 
     private void define(ExchangeRegistry r, ExchangeFactory f,
                         AMQShortString name, AMQShortString type) throws AMQException
     {
-        r.registerExchange(f.createExchange(name, type, true, false, 0));
+        r.registerExchange(f.createExchange(r,name, type, true, false, 0));
     }
 }

Modified: incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=558903&r1=558902&r2=558903
==============================================================================
--- incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Mon Jul 23 17:35:26 2007
@@ -643,4 +643,13 @@
         return _requestId;
     }
 
+    public MessageStore getMessageStore()
+    {
+    	return _store;
+    }
+    
+    public TransactionalContext getTransactionContext()
+    {
+    	return _txnContext;
+    }
 }

Modified: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/QpidTestClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/QpidTestClient.java?view=diff&rev=558903&r1=558902&r2=558903
==============================================================================
--- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/QpidTestClient.java (original)
+++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/QpidTestClient.java Mon Jul 23 17:35:26 2007
@@ -27,30 +27,174 @@
 			QpidExchangeHelper exchangeHelper = session.getExchangeHelper();
 			exchangeHelper.declareExchange(false, false, QpidConstants.DIRECT_EXCHANGE_NAME, false, false, false, QpidConstants.DIRECT_EXCHANGE_CLASS);
 			
-			QpidQueueHelper queueHelper = session.getQueueHelper();
-			queueHelper.declareQueue(false, false, false, false, false, "myQueue");
-			queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "myQueue", "RH");
+			exchangeHelper.declareExchange(false, false, QpidConstants.SYNAPSE_EXCHANGE_NAME, false, false, false, QpidConstants.SYNAPSE_EXCHANGE_CLASS);
+								
+		    //contentBasedRoutingSample(session);
 			
-			MessageHeaders msgHeaders = new MessageHeaders();
-			msgHeaders.setRoutingKey(new AMQShortString("RH"));
-			msgHeaders.setExchange(new AMQShortString(QpidConstants.DIRECT_EXCHANGE_NAME));
-			AMQPApplicationMessage msg = new AMQPApplicationMessage(msgHeaders,"test".getBytes());
+			//transformationSample2(session);			
 			
-			QpidMessageProducer messageProducer = session.createProducer();
-			messageProducer.open();
-			messageProducer.send(false, true, msg);
-			
-			QpidMessageConsumer messageConsumer = session.createConsumer("myQueue", false, false);
-			messageConsumer.open();
-			
-			AMQPApplicationMessage msg2 = messageConsumer.receive();
-			System.out.println(msg.toString());
+			binaryMessageTransformations(session);
 		}
 		catch(Exception e)
 		{
 			e.printStackTrace();
 		}
 		
+	}
+	
+	public static void contentBasedRoutingSample(QpidSession session) throws Exception
+	{
+		String tmp = "<m:troubleTicket xmlns:m=\"http://redhat.com/sample\"><m:customerId>532535</m:customerId><m:priority>";
+		String tmp2 = "</m:priority><m:appId>ESB</m:appId><m:desc>blabla</m:desc></m:troubleTicket>";
+		
+		//Create queues
+		QpidQueueHelper queueHelper = session.getQueueHelper();
+		queueHelper.declareQueue(false, false, false, false, false, "criticalTicketQueue");
+		queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "criticalTicketQueue", "criticalTicket");
+		
+		queueHelper.declareQueue(false, false, false, false, false, "lowPriorityTicketQueue");
+		queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "lowPriorityTicketQueue", "lowPriorityTicket");
+		
+		queueHelper.declareQueue(false, false, false, false, false, "ticketQueue");
+		queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "ticketQueue", "troubleTicket");
+		
+		QpidMessageProducer messageProducer = session.createProducer();
+		messageProducer.open();
+				
+		MessageHeaders msgHeaders = new MessageHeaders();
+		msgHeaders.setContentType(new AMQShortString("text/xml"));
+		msgHeaders.setRoutingKey(new AMQShortString("defectSystem"));
+		msgHeaders.setDestination(new AMQShortString(QpidConstants.SYNAPSE_EXCHANGE_NAME));
+		msgHeaders.setExchange(new AMQShortString(QpidConstants.SYNAPSE_EXCHANGE_NAME));
+				
+		StringBuffer buf = new StringBuffer();
+		buf.append(tmp).append("critical").append(tmp2);
+		AMQPApplicationMessage criticalMsg = new AMQPApplicationMessage(msgHeaders,buf.toString().getBytes());
+		System.out.println(criticalMsg.toString());
+		messageProducer.send(false, true, criticalMsg);
+		
+		buf = new StringBuffer();
+		buf.append(tmp).append("low").append(tmp2);
+		AMQPApplicationMessage lowMsg = new AMQPApplicationMessage(msgHeaders,buf.toString().getBytes());
+		System.out.println(lowMsg.toString());
+		messageProducer.send(false, true, lowMsg);
+		
+		buf = new StringBuffer();
+		buf.append(tmp).append("high").append(tmp2);
+		AMQPApplicationMessage highMsg = new AMQPApplicationMessage(msgHeaders,buf.toString().getBytes());
+		System.out.println(highMsg.toString());
+		messageProducer.send(false, true, highMsg);
+		
+		QpidMessageConsumer messageConsumerCritical = session.createConsumer("criticalTicketQueue", false, false);
+		messageConsumerCritical.open();		
+		AMQPApplicationMessage criticalMsgRcv = messageConsumerCritical.receive();
+		System.out.println(criticalMsgRcv.toString());
+		
+		QpidMessageConsumer messageConsumerLow = session.createConsumer("lowPriorityTicketQueue", false, false);
+		messageConsumerLow.open();		
+		AMQPApplicationMessage lowMsgRcv = messageConsumerLow.receive();
+		System.out.println(lowMsgRcv.toString());
+		
+		QpidMessageConsumer messageConsumer = session.createConsumer("ticketQueue", false, false);
+		messageConsumer.open();		
+		AMQPApplicationMessage msgRcv = messageConsumer.receive();
+		System.out.println(msgRcv.toString());
+		
+	}
+	
+	public static void transformationSample(QpidSession session) throws Exception
+	{
+		String tmp = "<m:quote xmlns:m=\"http://redhat.com/sample\"><m:ticker>RHT</m:ticker><m:value>125</m:value></m:quote>";
+		
+		QpidQueueHelper queueHelper = session.getQueueHelper();
+		queueHelper.declareQueue(false, false, false, false, false, "stockQuoteQueue");
+		queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "stockQuoteQueue", "stockQuote");
+		
+		MessageHeaders msgHeaders = new MessageHeaders();
+		msgHeaders.setContentType(new AMQShortString("text/xml"));
+		msgHeaders.setRoutingKey(new AMQShortString("stockQuote"));
+		msgHeaders.setDestination(new AMQShortString(QpidConstants.SYNAPSE_EXCHANGE_NAME));
+		msgHeaders.setExchange(new AMQShortString(QpidConstants.SYNAPSE_EXCHANGE_NAME));
+		
+		AMQPApplicationMessage msg = new AMQPApplicationMessage(msgHeaders,tmp.getBytes());
+		
+		QpidMessageProducer messageProducer = session.createProducer();
+		messageProducer.open();
+		System.out.println(msg.toString());
+		messageProducer.send(false, true, msg);
+	}
+	
+	public static void transformationSample2(QpidSession session) throws Exception
+	{
+		StringBuffer buf = new StringBuffer();
+		    buf.append("<m:vacationPackage xmlns:m=\"http://redhat.com/sample\">"); 
+			buf.append("<m:customerFirstName>Rajith</m:customerFirstName>"); 
+			buf.append("<m:customerLastName>Rajith</m:customerLastName>"); 
+			buf.append("<m:customerAddress>3349 Missississauga Road,Mississauga,ON, L5L 1J7</m:customerAddress>");   
+			buf.append("<m:customerDOB>Mississauga</m:customerDOB>");
+			buf.append("<m:paymentInfo>Visa,456454574575325325235,05122007</m:paymentInfo>");
+			buf.append("<m:start>12072007</m:start>");
+			buf.append("<m:end>18072007</m:end>");
+			buf.append("<m:airTicket>");     	
+			buf.append("<m:airline>AC</m:airline>");
+			buf.append("<m:seatPreference>W</m:seatPreference>");
+			buf.append("<m:frequentFlyer>643663345</m:frequentFlyer>");
+			buf.append("</m:airTicket>"); 
+			buf.append("<m:Hotel>");
+			buf.append("<m:noOfDays>5</m:noOfDays>");
+			buf.append("<m:rating>5</m:rating>");
+			buf.append("<m:meals>AI</m:meals>");   
+			buf.append("</m:Hotel>");
+			buf.append("<m:carRental>");
+			buf.append("<m:from>14062007</m:from>");
+			buf.append("<m:to>16062007</m:to>");
+			buf.append("</m:carRental>");
+			buf.append("</m:vacationPackage>");
+		
+		QpidQueueHelper queueHelper = session.getQueueHelper();
+		queueHelper.declareQueue(false, false, false, false, false, "carRentalQueue");
+		queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "carRentalQueue", "carRental");
+		
+		queueHelper.declareQueue(false, false, false, false, false, "hotelQueue");
+		queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "hotelQueue", "hotel");
+		
+		queueHelper.declareQueue(false, false, false, false, false, "airlineQueue");
+		queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "airlineQueue", "airline");
+		
+		MessageHeaders msgHeaders = new MessageHeaders();
+		msgHeaders.setContentType(new AMQShortString("text/xml"));
+		msgHeaders.setRoutingKey(new AMQShortString("vacationPackage"));
+		msgHeaders.setDestination(new AMQShortString(QpidConstants.SYNAPSE_EXCHANGE_NAME));
+		msgHeaders.setExchange(new AMQShortString(QpidConstants.SYNAPSE_EXCHANGE_NAME));
+		
+		AMQPApplicationMessage msg = new AMQPApplicationMessage(msgHeaders,buf.toString().getBytes());
+		
+		QpidMessageProducer messageProducer = session.createProducer();
+		messageProducer.open();
+		System.out.println(msg.toString());
+		messageProducer.send(false, true, msg);
+	}
+	
+	public static void binaryMessageTransformations(QpidSession session) throws Exception
+	{
+		QpidQueueHelper queueHelper = session.getQueueHelper();
+		queueHelper.declareQueue(false, false, false, false, false, "binaryQueue");
+		queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "binaryQueue", "binary");
+		
+		MessageHeaders msgHeaders = new MessageHeaders();
+		msgHeaders.setContentType(new AMQShortString("application/octet-stream"));
+		msgHeaders.setRoutingKey(new AMQShortString("binary"));
+		msgHeaders.setDestination(new AMQShortString(QpidConstants.SYNAPSE_EXCHANGE_NAME));
+		msgHeaders.setExchange(new AMQShortString(QpidConstants.SYNAPSE_EXCHANGE_NAME));
+        
+		byte[] buf = new byte[]{72,101,108,108,111};
+		
+		AMQPApplicationMessage msg = new AMQPApplicationMessage(msgHeaders,buf);
+		
+		QpidMessageProducer messageProducer = session.createProducer();
+		messageProducer.open();
+		System.out.println(msg.toString());
+		messageProducer.send(false, true, msg);		
 	}
 	
 }

Modified: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConstants.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConstants.java?view=diff&rev=558903&r1=558902&r2=558903
==============================================================================
--- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConstants.java (original)
+++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConstants.java Mon Jul 23 17:35:26 2007
@@ -22,6 +22,10 @@
 
     public final static String FANOUT_EXCHANGE_CLASS = "fanout";
 
+    public final static String SYNAPSE_EXCHANGE_NAME = "amq.synapse";
+
+    public final static String SYNAPSE_EXCHANGE_CLASS = "synapse";
+    
 
     public final static String SYSTEM_MANAGEMENT_EXCHANGE_NAME = "qpid.sysmgmt";
 

Modified: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java?view=diff&rev=558903&r1=558902&r2=558903
==============================================================================
--- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java (original)
+++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java Mon Jul 23 17:35:26 2007
@@ -8,56 +8,56 @@
 
 public class PhaseFactory
 {
-    /**
-     * This method will create the pipe and return a reference
-     * to the top of the pipeline.
-     * 
-     * The application can then use this (top most) phase and all
-     * calls will propogated down the pipe.
-     * 
-     * Simillar calls orginating at the bottom of the pipeline
-     * will be propogated to the top.
-     * 
-     * @param ctx
-     * @return
-     * @throws AMQPException
-     */
-    public static Phase createPhasePipe(PhaseContext ctx) throws AMQPException
-    {
-	String key = AMQPConstants.PHASE_PIPE + "." + AMQPConstants.PHASE;
-	Map<Integer,Phase> phaseMap = new HashMap<Integer,Phase>();
-	List<String> list = ClientConfiguration.get().getList(key);
-	int index = 0;
-	for(String s:list)
+	/**
+	 * This method will create the pipe and return a reference
+	 * to the top of the pipeline.
+	 * 
+	 * The application can then use this (top most) phase and all
+	 * calls will propogated down the pipe.
+	 * 
+	 * Simillar calls orginating at the bottom of the pipeline
+	 * will be propogated to the top.
+	 * 
+	 * @param ctx
+	 * @return
+	 * @throws AMQPException
+	 */
+	public static Phase createPhasePipe(PhaseContext ctx) throws AMQPException
 	{
-	    try
-	    {
-		Phase temp = (Phase)Class.forName(s).newInstance();
-		phaseMap.put(ClientConfiguration.get().getInt(key + "(" + index +  ")." + AMQPConstants.INDEX),temp) ;
-	    }
-	    catch(Exception e)
-	    {
-		throw new AMQPException("Error loading phase " + ClientConfiguration.get().getString(s),e);
-	    }    
-	    index++;
+		String key = AMQPConstants.PHASE_PIPE + "." + AMQPConstants.PHASE;
+		Map<Integer, Phase> phaseMap = new HashMap<Integer, Phase>();
+		List<String> list = ClientConfiguration.get().getList(key);
+		int index = 0;
+		for (String s : list)
+		{
+			try
+			{
+				Phase temp = (Phase) Class.forName(s).newInstance();
+				phaseMap.put(ClientConfiguration.get().getInt(key + "(" + index + ")." + AMQPConstants.INDEX), temp);
+			}
+			catch (Exception e)
+			{
+				throw new AMQPException("Error loading phase " + ClientConfiguration.get().getString(s), e);
+			}
+			index++;
+		}
+
+		Phase current = null;
+		Phase prev = null;
+		Phase next = null;
+		//Lets build the phase pipe.
+		for (int i = 0; i < phaseMap.size(); i++)
+		{
+			current = phaseMap.get(i);
+			if (i + 1 < phaseMap.size())
+			{
+				next = phaseMap.get(i + 1);
+			}
+			current.init(ctx, next, prev);
+			prev = current;
+			next = null;
+		}
+
+		return current;
 	}
-	
-	Phase current = null;
-	Phase prev = null;
-	Phase next = null;
-	//Lets build the phase pipe.
-	for (int i=0; i<phaseMap.size();i++)
-	{
-	   current = phaseMap.get(i);	   
-	   if (i+1 < phaseMap.size())
-	   {
-	       next = phaseMap.get(i+1);
-	   }
-	   current.init(ctx, next, prev);
-	   prev = current;
-	   next = null;
-	}
-	
-	return current;
-    }
 }

Modified: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageConsumerImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageConsumerImpl.java?view=diff&rev=558903&r1=558902&r2=558903
==============================================================================
--- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageConsumerImpl.java (original)
+++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageConsumerImpl.java Mon Jul 23 17:35:26 2007
@@ -70,7 +70,14 @@
 	public AMQPApplicationMessage receive()throws QpidException
 	{
 		checkClosed();
-		return _queue.poll();
+		try
+		{
+			return _queue.take();
+		}
+		catch (InterruptedException e)
+		{
+		throw new QpidException("Error occurred while retrieving message",e);
+		}
 	}
 	
 	public AMQPApplicationMessage receive(long timeout, TimeUnit tu)throws QpidException

Modified: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageProducerImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageProducerImpl.java?view=diff&rev=558903&r1=558902&r2=558903
==============================================================================
--- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageProducerImpl.java (original)
+++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageProducerImpl.java Mon Jul 23 17:35:26 2007
@@ -82,8 +82,8 @@
 				msgHeaders.getContentType(), //contentType
 				msgHeaders.getCorrelationId(), //correlationId
 				msgHeaders.getDeliveryMode(), //deliveryMode non persistant
-				new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// destination
-				new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// exchange
+				msgHeaders.getDestination(),// destination
+				msgHeaders.getExchange(),// exchange
 				msgHeaders.getExpiration(), //expiration
 				msgHeaders.isImmediate(), //immediate
 				msgHeaders.isMandatory(), //mandatory