You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2007/02/14 21:02:22 UTC

svn commit: r507672 [8/16] - in /incubator/qpid/branches/qpid.0-9: gentools/src/org/apache/qpid/gentools/ gentools/templ.java/ gentools/templ.net/ java/ java/broker/ java/broker/bin/ java/broker/distribution/ java/broker/distribution/src/ java/broker/d...

Modified: incubator/qpid/branches/qpid.0-9/java/client/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/pom.xml?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/pom.xml (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/pom.xml Wed Feb 14 12:02:03 2007
@@ -35,40 +35,45 @@
 
     <properties>
         <topDirectoryLocation>..</topDirectoryLocation>
+        <java.source.version>1.5</java.source.version>
+        <qpid.version>${pom.version}</qpid.version>
+        <qpid.targetDir>${project.build.directory}</qpid.targetDir>
     </properties>
 
     <dependencies>
+
         <dependency>
             <groupId>org.apache.qpid</groupId>
             <artifactId>qpid-common</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.apache.qpid</groupId>
-            <artifactId>qpid-broker</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>commons-codec</groupId>
-            <artifactId>commons-codec</artifactId>
-        </dependency>
 
         <dependency>
             <groupId>org.apache.geronimo.specs</groupId>
             <artifactId>geronimo-jms_1.1_spec</artifactId>
         </dependency>
+
         <dependency>
             <groupId>commons-collections</groupId>
             <artifactId>commons-collections</artifactId>
         </dependency>
+
         <dependency>
             <groupId>commons-lang</groupId>
             <artifactId>commons-lang</artifactId>
         </dependency>
+
         <dependency>
             <groupId>org.apache.mina</groupId>
             <artifactId>mina-filter-ssl</artifactId>
         </dependency>
 
+	<!-- Test Dependencies -->
+        <dependency> <!-- for inVm Broker -->
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-broker</artifactId>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>jmscts</groupId>
             <artifactId>jmscts</artifactId>
@@ -85,12 +90,14 @@
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
+            <scope>test</scope>
         </dependency>
+
         <dependency>
             <groupId>org.easymock</groupId>
             <artifactId>easymockclassextension</artifactId>
+            <scope>test</scope>
         </dependency>
-
     </dependencies>
 
     <build>
@@ -115,11 +122,67 @@
                         </property>
                         <property>
                             <name>log4j.configuration</name>
-                            <value>file:///${basedir}/src/main/java/log4j.properties</value>
+                            <value>file:///${basedir}/src/main/java/client.log4j</value>
                         </property>
                     </systemProperties>
                 </configuration>
             </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+
         </plugins>
+
+<!-- The inclusion of this resource causes the build to hang. -->
+    <!--resources>
+        <resource>
+            <targetPath>META-INF/</targetPath>
+            <filtering>false</filtering>
+            <directory>../resources/META-INF</directory>
+            <includes>
+                <include>**</include>
+            </includes>
+        </resource>
+    </resources-->
+
+    <testResources>
+        <testResource>
+            <targetPath>META-INF/</targetPath>
+            <filtering>false</filtering>
+            <directory>../resources/META-INF</directory>
+            <includes>
+                <include>**</include>
+            </includes>
+        </testResource>
+        <testResource>
+            <targetPath>src/</targetPath>
+            <filtering>false</filtering>
+            <directory>src/test/java</directory>
+            <includes>
+                <include>**/*.java</include>
+            </includes>
+        </testResource>
+
+        <testResource>
+            <targetPath></targetPath>
+            <filtering>false</filtering>
+            <directory>src/main/java</directory>
+            <includes>
+                <include>client.log4j</include>
+            </includes>
+        </testResource>
+    </testResources>
+
     </build>
+
 </project>

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java Wed Feb 14 12:02:03 2007
@@ -308,7 +308,7 @@
             }
         }
 
-        //remove the extra DEFAULT_OPTION_SEPERATOR or the '?' if there are no options
+        //removeKey the extra DEFAULT_OPTION_SEPERATOR or the '?' if there are no options
         optionsURL.deleteCharAt(optionsURL.length() - 1);
 
         return optionsURL.toString();
@@ -334,4 +334,15 @@
     }
 
 
+    public static String checkTransport(String broker)
+    {
+        if ((!broker.contains("://")))
+        {
+            return "tcp://" + broker;
+        }
+        else
+        {
+            return broker;
+        }
+    }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Wed Feb 14 12:02:03 2007
@@ -30,6 +30,7 @@
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.jms.ConnectionConsumer;
 import javax.jms.ConnectionMetaData;
@@ -156,21 +157,30 @@
 
 
     // Keeps a tally of connections for logging and debugging
-    private static AtomicInteger _ConnectionId;    
-    static { _ConnectionId = new AtomicInteger(0); }
+    private static AtomicLong _ConnectionId;    
+    static { _ConnectionId = new AtomicLong(0); }
 
     /*
      * The connection meta data
      */
     private QpidConnectionMetaData _connectionMetaData;
 
+    /**
+     * @param broker      brokerdetails
+     * @param username    username
+     * @param password    password
+     * @param clientName  clientid
+     * @param virtualHost virtualhost
+     * @throws AMQException
+     * @throws URLSyntaxException
+     */
     public AMQConnection(String broker, String username, String password,
                          String clientName, String virtualHost) throws AMQException, URLSyntaxException
     {
         this(new AMQConnectionURL(ConnectionURL.AMQ_PROTOCOL + "://" +
                                   username + ":" + password + "@" +
-                                  (clientName==null?"":clientName) +
-                                  virtualHost + "?brokerlist='" + broker + "'"));
+                                  (clientName == null ? "" : clientName) + "/" +
+                                  virtualHost + "?brokerlist='" + AMQBrokerDetails.checkTransport(broker) + "'"));
     }
 
     public AMQConnection(String host, int port, String username, String password,
@@ -185,12 +195,12 @@
         this(new AMQConnectionURL(useSSL ?
                                   ConnectionURL.AMQ_PROTOCOL + "://" +
                                   username + ":" + password + "@" +
-                                  (clientName==null?"":clientName) +
+                                  (clientName == null ? "" : clientName) +
                                   virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'"
                                   + "," + ConnectionURL.OPTIONS_SSL + "='true'" :
                                                                                 ConnectionURL.AMQ_PROTOCOL + "://" +
                                                                                 username + ":" + password + "@" +
-                                                                                (clientName==null?"":clientName) +
+                                                                                (clientName == null ? "" : clientName) +
                                                                                 virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'"
                                                                                 + "," + ConnectionURL.OPTIONS_SSL + "='false'"
         ));
@@ -215,7 +225,7 @@
         _clientName = connectionURL.getClientName();
         _username = connectionURL.getUsername();
         _password = connectionURL.getPassword();
-        _virtualHost = connectionURL.getVirtualHost();
+        setVirtualHost(connectionURL.getVirtualHost());
 
         _failoverPolicy = new FailoverPolicy(connectionURL);
 
@@ -326,6 +336,15 @@
         _clientName = clientName;
         _username = username;
         _password = password;
+        setVirtualHost(virtualHost);
+    }
+
+    private void setVirtualHost(String virtualHost)
+    {
+        if(virtualHost.startsWith("/"))
+        {
+            virtualHost = virtualHost.substring(1);
+        }
         _virtualHost = virtualHost;
     }
 
@@ -410,6 +429,12 @@
         return _failoverPolicy.failoverAllowed();
     }
 
+    // Test purposes only - used for testing refs, which cannot be done using JMS interfaces
+    public AMQSession createAMQSession(final boolean transacted, final int acknowledgeMode) throws JMSException
+    {
+        return (AMQSession)createSession(transacted, acknowledgeMode, AMQSession.DEFAULT_PREFETCH_HIGH_MARK);
+    }
+
     public Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException
     {
         return createSession(transacted, acknowledgeMode, AMQSession.DEFAULT_PREFETCH_HIGH_MARK);
@@ -484,20 +509,21 @@
     private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
             throws AMQException
     {
-        // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
         // Be aware of possible changes to parameter order as versions change.
         _protocolHandler.syncWrite(channelId,
-            ChannelOpenBody.createMethodBody((byte)0, (byte)9,	// AMQP version (major, minor)
+            ChannelOpenBody.createMethodBody(
+                _protocolHandler.getProtocolMajorVersion(), // AMQP major version
+                _protocolHandler.getProtocolMinorVersion(), // AMQP minor version
                 null),	// outOfBand
                 ChannelOpenOkBody.class);
 
         //todo send low water mark when protocol allows.
-        // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+
         // Be aware of possible changes to parameter order as versions change.
         _protocolHandler.syncWrite(channelId,
-		MessageQosBody.createMethodBody((byte)0, (byte)9,	// AMQP version (major, minor)
+		    MessageQosBody.createMethodBody(
+                _protocolHandler.getProtocolMajorVersion(), // AMQP major version
+                _protocolHandler.getProtocolMinorVersion(), // AMQP minor version
                 false,	// global
                 prefetchHigh,	// prefetchCount
                 0),	// prefetchSize
@@ -509,10 +535,11 @@
             {
                 _logger.debug("Issuing TxSelect for " + channelId);
             }
-            // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
             // Be aware of possible changes to parameter order as versions change.
-            _protocolHandler.syncWrite(channelId, TxSelectBody.createMethodBody((byte)0, (byte)9), TxSelectOkBody.class);
+            _protocolHandler.syncWrite(channelId, TxSelectBody.createMethodBody(
+                _protocolHandler.getProtocolMajorVersion(), // AMQP major version
+                _protocolHandler.getProtocolMinorVersion()), // AMQP minor version
+                TxSelectOkBody.class);
         }
     }
 
@@ -544,6 +571,7 @@
     /**
      * Returns an AMQQueueSessionAdaptor which wraps an AMQSession and throws IllegalStateExceptions
      * where specified in the JMS spec
+     *
      * @param transacted
      * @param acknowledgeMode
      * @return QueueSession
@@ -557,6 +585,7 @@
     /**
      * Returns an AMQTopicSessionAdapter which wraps an AMQSession and throws IllegalStateExceptions
      * where specified in the JMS spec
+     *
      * @param transacted
      * @param acknowledgeMode
      * @return TopicSession
@@ -591,7 +620,7 @@
     {
         checkNotClosed();
         return _connectionMetaData;
-        
+
     }
 
     public ExceptionListener getExceptionListener() throws JMSException
@@ -642,13 +671,18 @@
 
     public void close() throws JMSException
     {
-        synchronized(getFailoverMutex())
+        close(-1);
+    }
+
+    public void close(long timeout) throws JMSException
+    {
+        synchronized (getFailoverMutex())
         {
             if (!_closed.getAndSet(true))
             {
                 try
                 {
-                    closeAllSessions(null);
+                    closeAllSessions(null, timeout);
                     _protocolHandler.closeConnection();
                 }
                 catch (AMQException e)
@@ -686,7 +720,7 @@
      *              <p/>
      *              The caller must hold the failover mutex before calling this method.
      */
-    private void closeAllSessions(Throwable cause) throws JMSException
+    private void closeAllSessions(Throwable cause, long timeout) throws JMSException
     {
         final LinkedList sessionCopy = new LinkedList(_sessions.values());
         final Iterator it = sessionCopy.iterator();
@@ -702,7 +736,7 @@
             {
                 try
                 {
-                    session.close();
+                    session.close(timeout);
                 }
                 catch (JMSException e)
                 {
@@ -920,7 +954,7 @@
         {
             if (cause instanceof AMQException)
             {
-                je = new JMSException(Integer.toString(((AMQException)cause).getErrorCode()) ,"Exception thrown against " + toString() + ": " + cause);
+                je = new JMSException(Integer.toString(((AMQException) cause).getErrorCode()), "Exception thrown against " + toString() + ": " + cause);
             }
             else
             {
@@ -951,7 +985,7 @@
             {
                 _logger.info("Closing AMQConnection due to :" + cause.getMessage());
                 _closed.set(true);
-                closeAllSessions(cause); // FIXME: when doing this end up with RejectedExecutionException from executor.
+                closeAllSessions(cause, -1); // FIXME: when doing this end up with RejectedExecutionException from executor.
             }
             catch (JMSException e)
             {
@@ -973,8 +1007,8 @@
     void deregisterSession(int channelId)
     {
         _sessions.remove(channelId);
-    }    
-    
+    }
+
     /**
      * For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling.
      * The caller must hold the failover mutex before calling this method.
@@ -982,7 +1016,7 @@
     public void resubscribeSessions() throws JMSException, AMQException
     {
         ArrayList sessions = new ArrayList(_sessions.values());
-        _logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: remove?
+        _logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: removeKey?
         for (Iterator it = sessions.iterator(); it.hasNext();)
         {
             AMQSession s = (AMQSession) it.next();
@@ -1025,7 +1059,7 @@
                 null);          // factory location
     }
     
-    public int getConnectionId()
+    public long getConnectionId()
     {
         return _ConnectionId.get();
     }

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java Wed Feb 14 12:02:03 2007
@@ -25,6 +25,7 @@
 import org.apache.qpid.url.URLSyntaxException;
 import org.apache.qpid.url.URLHelper;
 import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
 
 import javax.naming.Reference;
 import javax.naming.NamingException;
@@ -35,19 +36,31 @@
 
 public abstract class AMQDestination implements Destination, Referenceable
 {
-    protected final String _exchangeName;
+    protected final AMQShortString _exchangeName;
 
-    protected final String _exchangeClass;
+    protected final AMQShortString _exchangeClass;
 
-    protected final String _destinationName;
+    protected final AMQShortString _destinationName;
 
-    protected boolean _isDurable;
+    protected final boolean _isDurable;
 
     protected final boolean _isExclusive;
 
     protected final boolean _isAutoDelete;
 
-    protected String _queueName;
+    private AMQShortString _queueName;
+
+    private String _url;
+    private AMQShortString _urlAsShortString;
+
+    private byte[] _byteEncoding;
+    private static final int IS_DURABLE_MASK = 0x1;
+    private static final int IS_EXCLUSIVE_MASK = 0x2;
+    private static final int IS_AUTODELETE_MASK = 0x4;
+
+    public static final byte QUEUE_TYPE = 1;
+    public static final byte TOPIC_TYPE = 2;
+    public static final byte UNKNOWN_TYPE = 3;
 
     protected AMQDestination(String url) throws URLSyntaxException
     {
@@ -63,27 +76,27 @@
         _isExclusive = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_EXCLUSIVE));
         _isAutoDelete = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_AUTODELETE));
         _isDurable = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_DURABLE));
-        _queueName = binding.getQueueName();
+        _queueName = new AMQShortString(binding.getQueueName());
     }
 
-    protected AMQDestination(String exchangeName, String exchangeClass, String destinationName, String queueName)
+    protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString destinationName, AMQShortString queueName)
     {
         this(exchangeName, exchangeClass, destinationName, false, false, queueName);
     }
 
-    protected AMQDestination(String exchangeName, String exchangeClass, String destinationName)
+    protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString destinationName)
     {
         this(exchangeName, exchangeClass, destinationName, false, false, null);
     }
 
-    protected AMQDestination(String exchangeName, String exchangeClass, String destinationName, boolean isExclusive,
-                             boolean isAutoDelete, String queueName)
+    protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString destinationName, boolean isExclusive,
+                             boolean isAutoDelete, AMQShortString queueName)
     {
         this(exchangeName, exchangeClass, destinationName, isExclusive, isAutoDelete, queueName, false);
     }
 
-    protected AMQDestination(String exchangeName, String exchangeClass, String destinationName, boolean isExclusive,
-                             boolean isAutoDelete, String queueName, boolean isDurable)
+    protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString destinationName, boolean isExclusive,
+                             boolean isAutoDelete, AMQShortString queueName, boolean isDurable)
     {
         if (destinationName == null)
         {
@@ -106,9 +119,13 @@
         _isDurable = isDurable;
     }
 
-    public String getEncodedName()
+    public AMQShortString getEncodedName()
     {
-        return toURL();
+        if(_urlAsShortString == null)
+        {
+            toURL();
+        }
+        return _urlAsShortString;
     }
 
     public boolean isDurable()
@@ -116,12 +133,12 @@
         return _isDurable;
     }
 
-    public String getExchangeName()
+    public AMQShortString getExchangeName()
     {
         return _exchangeName;
     }
 
-    public String getExchangeClass()
+    public AMQShortString getExchangeClass()
     {
         return _exchangeClass;
     }
@@ -136,22 +153,34 @@
         return ExchangeDefaults.DIRECT_EXCHANGE_NAME.equals(_exchangeName);
     }
 
-    public String getDestinationName()
+    public AMQShortString getDestinationName()
     {
         return _destinationName;
     }
 
     public String getQueueName()
     {
+        return _queueName == null ? null : _queueName.toString();
+    }
+
+    public AMQShortString getAMQQueueName()
+    {
         return _queueName;
     }
 
-    public void setQueueName(String queueName)
+
+
+    public void setQueueName(AMQShortString queueName)
     {
+
         _queueName = queueName;
+        // calculated URL now out of date
+        _url = null;
+        _urlAsShortString = null;
+        _byteEncoding = null;
     }
 
-    public abstract String getRoutingKey();
+    public abstract AMQShortString getRoutingKey();
 
     public boolean isExclusive()
     {
@@ -179,53 +208,114 @@
 
     public String toURL()
     {
-        StringBuffer sb = new StringBuffer();
-
-        sb.append(_exchangeClass);
-        sb.append("://");
-        sb.append(_exchangeName);
-
-        sb.append("/");
-
-        if (_destinationName != null)
+        String url = _url;
+        if(url == null)
         {
-            sb.append(_destinationName);
-        }
 
-        sb.append("/");
 
-        if (_queueName != null)
-        {
-            sb.append(_queueName);
-        }
+            StringBuffer sb = new StringBuffer();
 
-        sb.append("?");
+            sb.append(_exchangeClass);
+            sb.append("://");
+            sb.append(_exchangeName);
+
+            sb.append('/');
+
+            if (_destinationName != null)
+            {
+                sb.append(_destinationName);
+            }
+
+            sb.append('/');
+
+            if (_queueName != null)
+            {
+                sb.append(_queueName);
+            }
+
+            sb.append('?');
+
+            if (_isDurable)
+            {
+                sb.append(BindingURL.OPTION_DURABLE);
+                sb.append("='true'");
+                sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+            }
+
+            if (_isExclusive)
+            {
+                sb.append(BindingURL.OPTION_EXCLUSIVE);
+                sb.append("='true'");
+                sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+            }
+
+            if (_isAutoDelete)
+            {
+                sb.append(BindingURL.OPTION_AUTODELETE);
+                sb.append("='true'");
+                sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+            }
+
+            //removeKey the last char '?' if there is no options , ',' if there are.
+            sb.deleteCharAt(sb.length() - 1);
+            url = sb.toString();
+            _url = url;
+            _urlAsShortString = new AMQShortString(url);
+        }
+        return url;
+    }
+
+    public byte[] toByteEncoding()
+    {
+        byte[] encoding = _byteEncoding;
+        if(encoding == null)
+        {
+            int size = _exchangeClass.length() + 1 +
+                       _exchangeName.length() + 1 +
+                       (_destinationName == null ? 0 : _destinationName.length()) + 1 +
+                       (_queueName == null ? 0 : _queueName.length()) + 1 +
+                       1;
+            encoding = new byte[size];
+            int pos = 0;
+
+            pos = _exchangeClass.writeToByteArray(encoding, pos);
+            pos = _exchangeName.writeToByteArray(encoding, pos);
+            if(_destinationName == null)
+            {
+                encoding[pos++] = (byte)0;
+            }
+            else
+            {
+                pos = _destinationName.writeToByteArray(encoding,pos);
+            }
+            if(_queueName == null)
+            {
+                encoding[pos++] = (byte)0;
+            }
+            else
+            {
+                pos = _queueName.writeToByteArray(encoding,pos);
+            }
+            byte options = 0;
+            if(_isDurable)
+            {
+                options |= IS_DURABLE_MASK;
+            }
+            if(_isExclusive)
+            {
+                options |= IS_EXCLUSIVE_MASK;
+            }
+            if(_isAutoDelete)
+            {
+                options |= IS_AUTODELETE_MASK;
+            }
+            encoding[pos] = options;
 
-        if (_isDurable)
-        {
-            sb.append(BindingURL.OPTION_DURABLE);
-            sb.append("='true'");
-            sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
-        }
 
-        if (_isExclusive)
-        {
-            sb.append(BindingURL.OPTION_EXCLUSIVE);
-            sb.append("='true'");
-            sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
-        }
+            _byteEncoding = encoding;
 
-        if (_isAutoDelete)
-        {
-            sb.append(BindingURL.OPTION_AUTODELETE);
-            sb.append("='true'");
-            sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
         }
-
-        //remove the last char '?' if there is no options , ',' if there are.
-        sb.deleteCharAt(sb.length() - 1);
-
-        return sb.toString();
+        return encoding;
     }
 
     public boolean equals(Object o)
@@ -258,7 +348,7 @@
         {
             return false;
         }
-        if (_isExclusive != that._isExclusive)
+  /*      if (_isExclusive != that._isExclusive)
         {
             return false;
         }
@@ -266,6 +356,7 @@
         {
             return false;
         }
+        */
         return true;
     }
 
@@ -279,8 +370,8 @@
         {
             result = 29 * result + _queueName.hashCode();
         }
-        result = result * (_isExclusive ? 13 : 7);
-        result = result * (_isAutoDelete ? 13 : 7);
+//        result = result * (_isExclusive ? 13 : 7);
+//        result = result * (_isAutoDelete ? 13 : 7);
         return result;
     }
 
@@ -293,9 +384,55 @@
                 null);          // factory location
     }
 
+
+    public static Destination createDestination(byte[] byteEncodedDestination)
+    {
+        AMQShortString exchangeClass;
+        AMQShortString exchangeName;
+        AMQShortString destinationName;
+        AMQShortString queueName;
+        boolean isDurable;
+        boolean isExclusive;
+        boolean isAutoDelete;
+
+        int pos = 0;
+        exchangeClass = AMQShortString.readFromByteArray(byteEncodedDestination, pos);
+        pos+= exchangeClass.length() + 1;
+        exchangeName =  AMQShortString.readFromByteArray(byteEncodedDestination, pos);
+        pos+= exchangeName.length() + 1;
+        destinationName =  AMQShortString.readFromByteArray(byteEncodedDestination, pos);
+        pos+= (destinationName == null ? 0 : destinationName.length()) + 1;
+        queueName =  AMQShortString.readFromByteArray(byteEncodedDestination, pos);
+        pos+= (queueName == null ? 0 : queueName.length()) + 1;
+        int options = byteEncodedDestination[pos];
+        isDurable = (options & IS_DURABLE_MASK) != 0;
+        isExclusive = (options & IS_EXCLUSIVE_MASK) != 0;
+        isAutoDelete = (options & IS_AUTODELETE_MASK) != 0;
+
+        if (exchangeClass.equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS))
+        {
+            return new AMQQueue(destinationName,queueName,isExclusive,isAutoDelete,isDurable);
+        }
+        else if (exchangeClass.equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS))
+        {
+            return new AMQTopic(destinationName,isAutoDelete,queueName,isDurable);
+        }
+        else if (exchangeClass.equals(ExchangeDefaults.HEADERS_EXCHANGE_CLASS))
+        {
+            return new AMQHeadersExchange(destinationName);
+        }
+        else
+        {
+            throw new IllegalArgumentException("Unknown Exchange Class:" + exchangeClass);
+        }
+
+
+
+    }
+
     public static Destination createDestination(BindingURL binding)
     {
-        String type = binding.getExchangeClass();
+        AMQShortString type = binding.getExchangeClass();
 
         if (type.equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS))
         {

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java Wed Feb 14 12:02:03 2007
@@ -22,6 +22,7 @@
 
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.url.BindingURL;
+import org.apache.qpid.framing.AMQShortString;
 
 /**
  * A destination backed by a headers exchange
@@ -33,12 +34,17 @@
         this(binding.getExchangeName());
     }
 
-    public AMQHeadersExchange(String queueName)
+    public AMQHeadersExchange(String name)
+    {
+        this(new AMQShortString(name));
+    }
+
+    public AMQHeadersExchange(AMQShortString queueName)
     {
         super(queueName, ExchangeDefaults.HEADERS_EXCHANGE_CLASS, queueName, true, true, null);
     }
 
-    public String getRoutingKey()
+    public AMQShortString getRoutingKey()
     {
         return getDestinationName();
     }

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java Wed Feb 14 12:02:03 2007
@@ -22,6 +22,7 @@
 
 import org.apache.qpid.url.BindingURL;
 import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
 
 import javax.jms.Queue;
 
@@ -42,11 +43,27 @@
      * Create a reference to a non temporary queue. Note this does not actually imply the queue exists.
      * @param name the name of the queue
      */
-    public AMQQueue(String name)
+    public AMQQueue(AMQShortString name)
     {
         this(name, false);
     }
 
+    public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName)
+    {
+        super(exchangeName, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, routingKey, false,
+              false, queueName, false);    }
+
+
+    /**
+     * Create a reference to a non temporary queue. Note this does not actually imply the queue exists.
+     * @param name the name of the queue
+     */
+    public AMQQueue(String name)
+    {
+        this(new AMQShortString(name), false);
+    }
+
+
     /**
      * Create a queue with a specified name.
      *
@@ -56,10 +73,23 @@
      */
     public AMQQueue(String name, boolean temporary)
     {
+        this(new AMQShortString(name),temporary);
+    }
+
+
+    /**
+     * Create a queue with a specified name.
+     *
+     * @param name the destination name (used in the routing key)
+     * @param temporary if true the broker will generate a queue name, also if true then the queue is autodeleted
+     * and exclusive
+     */
+    public AMQQueue(AMQShortString name, boolean temporary)
+    {
         // queue name is set to null indicating that the broker assigns a name in the case of temporary queues
         // temporary queues are typically used as response queues
-        this(name, temporary?null:name, temporary, temporary);
-        _isDurable = !temporary;
+        this(name, temporary?null:name, temporary, temporary, !temporary);
+        
     }
 
     /**
@@ -69,16 +99,22 @@
      * @param exclusive true if the queue should only permit a single consumer
      * @param autoDelete true if the queue should be deleted automatically when the last consumers detaches
      */
-    public AMQQueue(String destinationName, String queueName, boolean exclusive, boolean autoDelete)
+    public AMQQueue(AMQShortString destinationName, AMQShortString queueName, boolean exclusive, boolean autoDelete)
+    {
+        this(destinationName, queueName, exclusive, autoDelete, false);
+    }
+
+
+    public AMQQueue(AMQShortString destinationName, AMQShortString queueName, boolean exclusive, boolean autoDelete, boolean durable)
     {
         super(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, destinationName, exclusive,
-              autoDelete, queueName);
+              autoDelete, queueName, durable);
     }
 
   
-    public String getRoutingKey()
+    public AMQShortString getRoutingKey()
     {
-        return getQueueName();
+        return getAMQQueueName();
     }
 
     public boolean isNameRequired()

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Wed Feb 14 12:02:03 2007
@@ -20,16 +20,6 @@
  */
 package org.apache.qpid.client;
 
-import java.io.Serializable;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
 import javax.jms.BytesMessage;
 import javax.jms.Destination;
 import javax.jms.IllegalStateException;
@@ -60,16 +50,15 @@
 import org.apache.qpid.AMQInvalidSelectorException;
 import org.apache.qpid.AMQUndeliveredException;
 import org.apache.qpid.client.failover.FailoverSupport;
-import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.client.message.JMSStreamMessage;
-import org.apache.qpid.client.message.MessageFactoryRegistry;
-import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.message.*;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.client.util.FlowControllingBlockingQueue;
 import org.apache.qpid.common.AMQPFilterTypes;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ChannelCloseBody;
 import org.apache.qpid.framing.ChannelCloseOkBody;
 import org.apache.qpid.framing.ChannelFlowBody;
@@ -93,10 +82,19 @@
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.jms.Session;
 import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.handler.ExchangeBoundHandler;
 import org.apache.qpid.url.AMQBindingURL;
 import org.apache.qpid.url.URLSyntaxException;
 
+import java.io.Serializable;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
 public class AMQSession extends Closeable implements Session, QueueSession, TopicSession
 {
     private static final Logger _logger = Logger.getLogger(AMQSession.class);
@@ -115,6 +113,10 @@
     private int _defaultPrefetchHighMark = DEFAULT_PREFETCH_HIGH_MARK;
     private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK;
 
+    private MessageListener _messageListener = null;
+
+    private AtomicBoolean _startedAtLeastOnce = new AtomicBoolean(false);
+
     /**
      * Used to reference durable subscribers so they requests for unsubscribe can be handled
      * correctly.  Note this only keeps a record of subscriptions which have been created
@@ -137,8 +139,11 @@
      */
     private final FlowControllingBlockingQueue _queue;
 
+    // working
     private ConcurrentLinkedQueue<Long> _unacknowledged = new ConcurrentLinkedQueue();
-
+    // merge-right.r501854
+    private final java.util.Queue<MessageConsumerPair> _reprocessQueue;
+    
     private Dispatcher _dispatcher;
 
     private MessageFactoryRegistry _messageFactoryRegistry;
@@ -151,7 +156,7 @@
     /**
      * Maps from consumer tag (String) to JMSMessageConsumer instance
      */
-    private Map<String, BasicMessageConsumer> _consumers = new ConcurrentHashMap<String, BasicMessageConsumer>();
+    private Map<AMQShortString, BasicMessageConsumer> _consumers = new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
 
     /**
      * Maps from destination to count of JMSMessageConsumers
@@ -186,16 +191,27 @@
     private volatile AtomicBoolean _stopped = new AtomicBoolean(true);
 
     /**
+     * Used to signal 'pausing' the dispatcher when setting a message listener on a consumer
+     */
+    private final AtomicBoolean _pausingDispatcher = new AtomicBoolean(false);
+
+    /**
+     * Used to signal 'pausing' the dispatcher when setting a message listener on a consumer
+     */
+    private final AtomicBoolean _pausedDispatcher = new AtomicBoolean(false);
+
+    /**
      * Set when recover is called. This is to handle the case where recover() is called by application code
      * during onMessage() processing. We need to make sure we do not send an auto ack if recover was called.
      */
     private boolean _inRecovery;
 
-
+    private boolean _hasMessageListeners;
 
     /**
      * Responsible for decoding a message fragment and passing it to the appropriate message consumer.
      */
+    
     private class Dispatcher extends Thread
     {
         public Dispatcher()
@@ -224,57 +240,18 @@
 
         private void dispatchMessage(UnprocessedMessage message)
         {
-            if (message.contents != null)
-            {
-                final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.contentHeader.getDestination());
+            final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.getMessageHeaders().getDestination());
 
-                if (consumer == null)
-                {
-                    _logger.warn("Received a message from queue " + message.contentHeader.getDestination() + " without a handler - ignoring...");
-                    _logger.warn("Consumers that exist: " + _consumers);
-                    _logger.warn("Session hashcode: " + System.identityHashCode(this));
-                }
-                else
-                {
-
-                    consumer.notifyMessage(message, _channelId);
-
-                }
+            if (consumer == null)
+            {
+                _logger.warn("Received a message from queue " + message.getMessageHeaders().getDestination() + " without a handler - ignoring...");
+                _logger.warn("Consumers that exist: " + _consumers);
+                _logger.warn("Session hashcode: " + System.identityHashCode(this));
             }
-            /*else
+            else
             {
-                try
-                {
-                    // Bounced message is processed here, away from the mina thread
-                    AbstractJMSMessage bouncedMessage = _messageFactoryRegistry.createMessage(0,
-                                                                                              false,
-                                                                                              message.contentHeader,
-                                                                                              message.content);
-
-                    int errorCode = message.bounceBody.replyCode;
-                    String reason = message.bounceBody.replyText;
-                    _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
-
-                    //@TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
-                    if (errorCode == AMQConstant.NO_CONSUMERS.getCode())
-                    {
-                        _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage));
-                    }
-                    else if (errorCode == AMQConstant.NO_ROUTE.getCode())
-                    {
-                        _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage));
-                    }
-                    else
-                    {
-                        _connection.exceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage));
-                    }
-
-                }
-                catch (Exception e)
-                {
-                    _logger.error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", e);
-                }
-            }*/
+                consumer.notifyMessage(message, _channelId);
+            }
         }
 
         public void stopDispatcher()
@@ -284,6 +261,8 @@
         }
     }
 
+
+
     AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
                MessageFactoryRegistry messageFactoryRegistry)
     {
@@ -314,6 +293,8 @@
         _defaultPrefetchHighMark = defaultPrefetchHighMark;
         _defaultPrefetchLowMark = defaultPrefetchLowMark;
 
+        _reprocessQueue = new ConcurrentLinkedQueue<MessageConsumerPair>();
+
         if (_acknowledgeMode == NO_ACKNOWLEDGE)
         {
             _queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark,
@@ -374,136 +355,69 @@
 
     public BytesMessage createBytesMessage() throws JMSException
     {
-        synchronized(_connection.getFailoverMutex())
+        synchronized (_connection.getFailoverMutex())
         {
             checkNotClosed();
-            try
-            {
-                return (BytesMessage) _messageFactoryRegistry.createMessage("application/octet-stream");
-            }
-            catch (AMQException e)
-            {
-                throw new JMSException("Unable to create message: " + e);
-            }
+            return new JMSBytesMessage();
         }
     }
 
     public MapMessage createMapMessage() throws JMSException
     {
-        synchronized(_connection.getFailoverMutex())
+        synchronized (_connection.getFailoverMutex())
         {
             checkNotClosed();
-            try
-            {
-                return (MapMessage) _messageFactoryRegistry.createMessage("jms/map-message");
-            }
-            catch (AMQException e)
-            {
-                throw new JMSException("Unable to create message: " + e);
-            }
+            return new JMSMapMessage();
         }
     }
 
     public javax.jms.Message createMessage() throws JMSException
     {
-        synchronized(_connection.getFailoverMutex())
-        {
-            checkNotClosed();
-            try
-            {
-                return (BytesMessage) _messageFactoryRegistry.createMessage("application/octet-stream");
-            }
-            catch (AMQException e)
-            {
-                throw new JMSException("Unable to create message: " + e);
-            }
-        }
+        return createBytesMessage();
     }
 
     public ObjectMessage createObjectMessage() throws JMSException
     {
-        synchronized(_connection.getFailoverMutex())
+        synchronized (_connection.getFailoverMutex())
         {
             checkNotClosed();
-            try
-            {
-                return (ObjectMessage) _messageFactoryRegistry.createMessage("application/java-object-stream");
-            }
-            catch (AMQException e)
-            {
-                throw new JMSException("Unable to create message: " + e);
-            }
+            return (ObjectMessage) new JMSObjectMessage();
         }
     }
 
     public ObjectMessage createObjectMessage(Serializable object) throws JMSException
     {
-        synchronized(_connection.getFailoverMutex())
-        {
-            checkNotClosed();
-            try
-            {
-                ObjectMessage msg = (ObjectMessage) _messageFactoryRegistry.createMessage("application/java-object-stream");
-                msg.setObject(object);
-                return msg;
-            }
-            catch (AMQException e)
-            {
-                throw new JMSException("Unable to create message: " + e);
-            }
-        }
+        ObjectMessage msg = createObjectMessage();
+        msg.setObject(object);
+        return msg;
     }
 
     public StreamMessage createStreamMessage() throws JMSException
     {
-        synchronized(_connection.getFailoverMutex())
+        synchronized (_connection.getFailoverMutex())
         {
             checkNotClosed();
 
-            try
-            {
-                return (StreamMessage) _messageFactoryRegistry.createMessage(JMSStreamMessage.MIME_TYPE);
-            }
-            catch (AMQException e)
-            {
-                throw new JMSException("Unable to create text message: " + e);
-            }
+            return new JMSStreamMessage();
         }
     }
 
     public TextMessage createTextMessage() throws JMSException
     {
-        synchronized(_connection.getFailoverMutex())
+        synchronized (_connection.getFailoverMutex())
         {
             checkNotClosed();
 
-            try
-            {
-                return (TextMessage) _messageFactoryRegistry.createMessage("text/plain");
-            }
-            catch (AMQException e)
-            {
-                throw new JMSException("Unable to create text message: " + e);
-            }
+            return new JMSTextMessage();
         }
     }
 
     public TextMessage createTextMessage(String text) throws JMSException
     {
-        synchronized(_connection.getFailoverMutex())
-        {
-            checkNotClosed();
-            try
-            {
-                TextMessage msg = (TextMessage) _messageFactoryRegistry.createMessage("text/plain");
-                msg.setText(text);
-                return msg;
-            }
-            catch (AMQException e)
-            {
-                throw new JMSException("Unable to create text message: " + e);
-            }
-        }
+
+        TextMessage msg = createTextMessage();
+        msg.setText(text);
+        return msg;
     }
 
     public boolean getTransacted() throws JMSException
@@ -532,10 +446,11 @@
             }
 
             // Commits outstanding messages sent and outstanding acknowledgements.
-            // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
             // Be aware of possible changes to parameter order as versions change.
-            _connection.getProtocolHandler().syncWrite(_channelId, TxCommitBody.createMethodBody((byte)0, (byte)9), TxCommitOkBody.class);
+            _connection.getProtocolHandler().syncWrite(_channelId, TxCommitBody.createMethodBody(
+                getProtocolMajorVersion(),
+                getProtocolMinorVersion()),
+                TxCommitOkBody.class);
         }
         catch (AMQException e)
         {
@@ -545,17 +460,18 @@
         }
     }
 
+    
     public void rollback() throws JMSException
     {
         checkTransacted();
         try
         {
             _unacknowledged.clear();
-            // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
             // Be aware of possible changes to parameter order as versions change.
-            _connection.getProtocolHandler().syncWrite(_channelId, 
-                    TxRollbackBody.createMethodBody((byte)0, (byte)9), TxRollbackOkBody.class);
+            _connection.getProtocolHandler().syncWrite(_channelId, TxRollbackBody.createMethodBody(
+                getProtocolMajorVersion(),
+                getProtocolMinorVersion()),
+                TxRollbackOkBody.class);
         }
         catch (AMQException e)
         {
@@ -565,9 +481,14 @@
 
     public void close() throws JMSException
     {
+        close(-1);
+    }
+
+    public void close(long timeout) throws JMSException
+    {
         // We must close down all producers and consumers in an orderly fashion. This is the only method
         // that can be called from a different thread of control from the one controlling the session
-        synchronized(_connection.getFailoverMutex())
+        synchronized (_connection.getFailoverMutex())
         {
             //Ensure we only try and close an open session.
             if (!_closed.getAndSet(true))
@@ -578,15 +499,14 @@
                 try
                 {
                     _connection.getProtocolHandler().closeSession(this);
-        	        // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-        	        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
         	        // Be aware of possible changes to parameter order as versions change.
                     final AMQMethodBody methodBody = ChannelCloseBody.createMethodBody(
-                        (byte)0, (byte)9,	// AMQP version (major, minor)
+                        getProtocolMajorVersion(),
+                        getProtocolMinorVersion(),
                         0,	// classId
                         0,	// methodId
                         AMQConstant.REPLY_SUCCESS.getCode(),	// replyCode
-                        "JMS client closing channel");	// replyText
+                        new AMQShortString("JMS client closing channel"));	// replyText
                     _connection.getProtocolHandler().syncWrite(getChannelId(), methodBody, ChannelCloseOkBody.class);
                     // When control resumes at this point, a reply will have been received that
                     // indicates the broker has closed the channel successfully
@@ -606,13 +526,31 @@
         }
     }
 
+    private AMQProtocolHandler getProtocolHandler()
+    {
+        return _connection.getProtocolHandler();
+    }
+
+
+    private byte getProtocolMinorVersion()
+    {
+        return getProtocolHandler().getProtocolMinorVersion();
+    }
+
+    private byte getProtocolMajorVersion()
+    {
+        return getProtocolHandler().getProtocolMajorVersion();
+    }
+
+
     /**
      * Close all producers or consumers. This is called either in the error case or when closing the session normally.
      *
      * @param amqe the exception, may be null to indicate no error has occurred
      */
-    private void closeProducersAndConsumers(AMQException amqe)
+    private void closeProducersAndConsumers(AMQException amqe) throws JMSException
     {
+        JMSException jmse = null;
         try
         {
             closeProducers();
@@ -620,6 +558,7 @@
         catch (JMSException e)
         {
             _logger.error("Error closing session: " + e, e);
+            jmse = e;
         }
         try
         {
@@ -628,7 +567,19 @@
         catch (JMSException e)
         {
             _logger.error("Error closing session: " + e, e);
+            if (jmse == null)
+            {
+                jmse = e;
+            }
+        }
+        finally
+        {
+            if (jmse != null)
+            {
+                throw jmse;
+            }
         }
+
     }
 
     /**
@@ -637,9 +588,9 @@
      *
      * @param e the exception that caused this session to be closed. Null causes the
      */
-    public void closed(Throwable e)
+    public void closed(Throwable e) throws JMSException
     {
-        synchronized(_connection.getFailoverMutex())
+        synchronized (_connection.getFailoverMutex())
         {
             // An AMQException has an error code and message already and will be passed in when closure occurs as a
             // result of a channel close request
@@ -777,14 +728,17 @@
             consumer.clearUnackedMessages();
         }
         _unacknowledged.clear();
-        // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
         // Be aware of possible changes to parameter order as versions change.
-        try {
+        try
+        {
 			_connection.getProtocolHandler().writeRequest(_channelId,
-			    MessageRecoverBody.createMethodBody((byte)0, (byte)9,	// AMQP version (major, minor)
+			    MessageRecoverBody.createMethodBody(
+                    getProtocolMajorVersion(),
+                    getProtocolMinorVersion(),
 			        false));	// requeue
-		} catch (AMQException e) {
+		}
+        catch (AMQException e)
+        {
 			_logger.error("Error recovering",e);
 			JMSException ex = new JMSException("Error Recovering");
 			ex.initCause(e);
@@ -820,13 +774,37 @@
     public MessageListener getMessageListener() throws JMSException
     {
         checkNotClosed();
-        throw new java.lang.UnsupportedOperationException("MessageListener interface not supported");
+        return _messageListener;
     }
 
     public void setMessageListener(MessageListener listener) throws JMSException
     {
         checkNotClosed();
-        throw new java.lang.UnsupportedOperationException("MessageListener interface not supported");
+
+        if (!isStopped())
+        {
+            throw new javax.jms.IllegalStateException("Attempt to set listener while session is started.");
+        }
+
+        // We are stopped         
+        for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
+        {
+            BasicMessageConsumer consumer = i.next();
+
+            if (consumer.isReceiving())
+            {
+                throw new javax.jms.IllegalStateException("Another thread is already receiving synchronously.");
+            }
+        }
+
+        _messageListener = listener;
+        
+        for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
+        {
+            i.next().setMessageListener(_messageListener);
+        }
+
+
     }
 
     public void run()
@@ -858,6 +836,12 @@
         return createProducerImpl(destination, DEFAULT_MANDATORY, DEFAULT_IMMEDIATE);
     }
 
+    // Test purposes only - used for testing refs, which cannot be done using JMS interfaces
+    public BasicMessageProducer createBasicProducer(Topic destination) throws JMSException
+    {
+        return (BasicMessageProducer)createProducerImpl(destination, DEFAULT_MANDATORY, DEFAULT_IMMEDIATE);
+    }
+
     private org.apache.qpid.jms.MessageProducer createProducerImpl(Destination destination, boolean mandatory,
                                                                    boolean immediate)
             throws JMSException
@@ -883,7 +867,10 @@
                     registerProducer(producerId, producer);
                     return producer;
                 }
-                catch (AMQException e) { throw new JMSException(e.toString()); }
+                catch (AMQException e)
+                {
+                    throw new JMSException(e.toString());
+                }
             }
         }.execute(_connection);
     }
@@ -964,8 +951,8 @@
     }
 
     public MessageConsumer createBrowserConsumer(Destination destination,
-                                         String messageSelector,
-                                         boolean noLocal)
+                                                 String messageSelector,
+                                                 boolean noLocal)
             throws JMSException
     {
         checkValidDestination(destination);
@@ -1039,6 +1026,7 @@
     {
         checkTemporaryDestination(destination);
 
+
         return (org.apache.qpid.jms.MessageConsumer) new FailoverSupport()
         {
             public Object operation() throws JMSException
@@ -1047,7 +1035,7 @@
 
                 AMQDestination amqd = (AMQDestination) destination;
 
-                final AMQProtocolHandler protocolHandler = _connection.getProtocolHandler();
+                final AMQProtocolHandler protocolHandler = getProtocolHandler();
                 // TODO: construct the rawSelector from the selector string if rawSelector == null
                 final FieldTable ft = FieldTableFactory.newFieldTable();
                 //if (rawSelector != null)
@@ -1061,6 +1049,11 @@
                                                                          protocolHandler, ft, prefetchHigh, prefetchLow, exclusive,
                                                                          _acknowledgeMode, noConsume, autoClose);
 
+                if (_messageListener != null)
+                {
+                    consumer.setMessageListener(_messageListener);
+                }
+
                 try
                 {
                     registerConsumer(consumer, false);
@@ -1074,11 +1067,14 @@
                 catch (AMQException e)
                 {
                     JMSException ex = new JMSException("Error registering consumer: " + e);
+
+                    //todo remove
+                    e.printStackTrace();
                     ex.setLinkedException(e);
                     throw ex;
                 }
 
-                synchronized(destination)
+                synchronized (destination)
                 {
                     _destinationConsumerCount.putIfAbsent(destination, new AtomicInteger());
                     _destinationConsumerCount.get(destination).incrementAndGet();
@@ -1118,18 +1114,17 @@
     }
 
 
-    public void declareExchange(String name, String type) throws AMQException
+    public void declareExchange(AMQShortString name, AMQShortString type) throws AMQException
     {
-        declareExchange(name, type, _connection.getProtocolHandler());
+        declareExchange(name, type, getProtocolHandler());
     }
 
-    public void declareExchangeSynch(String name, String type) throws AMQException
+    public void declareExchangeSynch(AMQShortString name, AMQShortString type) throws AMQException
     {
-        // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
         // Be aware of possible changes to parameter order as versions change.
         AMQMethodBody methodBody = ExchangeDeclareBody.createMethodBody(
-            (byte)0, (byte)9,	// AMQP version (major, minor)
+            getProtocolMajorVersion(),    // AMQP major version
+            getProtocolMinorVersion(),    // AMQP minor version
             null,	// arguments
             false,	// autoDelete
             false,	// durable
@@ -1142,18 +1137,17 @@
         _connection.getProtocolHandler().syncWrite(_channelId, methodBody, ExchangeDeclareOkBody.class);
     }
 
-    private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler)throws AMQException
+    private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException
     {
         declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler);
     }
 
-    private void declareExchange(String name, String type, AMQProtocolHandler protocolHandler) throws AMQException
+    private void declareExchange(AMQShortString name, AMQShortString type, AMQProtocolHandler protocolHandler) throws AMQException
     {
-        // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
         // Be aware of possible changes to parameter order as versions change.
         AMQMethodBody methodBody = ExchangeDeclareBody.createMethodBody(
-            (byte)0, (byte)9,	// AMQP version (major, minor)
+            getProtocolMajorVersion(),    // AMQP major version
+            getProtocolMinorVersion(),    // AMQP minor version
             null,	// arguments
             false,	// autoDelete
             false,	// durable
@@ -1175,7 +1169,7 @@
      * @return the queue name. This is useful where the broker is generating a queue name on behalf of the client.
      * @throws AMQException
      */
-    private String declareQueue(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException
+    private AMQShortString declareQueue(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException
     {
         // For queues (but not topics) we generate the name in the client rather than the
         // server. This allows the name to be reused on failover if required. In general,
@@ -1185,38 +1179,35 @@
             amqd.setQueueName(protocolHandler.generateQueueName());
         }
 
-        // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
         // Be aware of possible changes to parameter order as versions change.
         AMQMethodBody methodBody = QueueDeclareBody.createMethodBody(
-            (byte)0, (byte)9,	// AMQP version (major, minor)
+            getProtocolMajorVersion(),    // AMQP major version
+            getProtocolMinorVersion(),    // AMQP minor version
             null,	// arguments
             amqd.isAutoDelete(),	// autoDelete
             amqd.isDurable(),	// durable
             amqd.isExclusive(),	// exclusive
             true,	// nowait
             false,	// passive
-            amqd.getQueueName(),	// queue
+            amqd.getAMQQueueName(),	// queue
             0);	// ticket
 
         protocolHandler.writeRequest(_channelId, methodBody);
-        return amqd.getQueueName();
+        return amqd.getAMQQueueName();
     }
 
-    private void bindQueue(AMQDestination amqd, String queueName, AMQProtocolHandler protocolHandler, FieldTable ft) throws AMQException
+    private void bindQueue(AMQDestination amqd, AMQShortString queueName, AMQProtocolHandler protocolHandler, FieldTable ft) throws AMQException
     {
-        // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
         // Be aware of possible changes to parameter order as versions change.
         AMQMethodBody methodBody = QueueBindBody.createMethodBody(
-            (byte)0, (byte)9,	// AMQP version (major, minor)
+            getProtocolMajorVersion(),    // AMQP major version
+            getProtocolMinorVersion(),    // AMQP minor version
             ft,	// arguments
             amqd.getExchangeName(),	// exchange
             true,	// nowait
             queueName,	// queue
             amqd.getRoutingKey(),	// routingKey
             0);	// ticket
-
         protocolHandler.writeRequest(_channelId, methodBody);
     }
 
@@ -1226,23 +1217,23 @@
      * @param queueName
      * @return the consumer tag generated by the broker
      */
-    private void consumeFromQueue(BasicMessageConsumer consumer, String queueName, AMQProtocolHandler protocolHandler,
+    private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler,
                                   boolean nowait, String messageSelector) throws AMQException
     {
         //fixme prefetch values are not used here. Do we need to have them as parametsrs?
         //need to generate a consumer tag on the client so we can exploit the nowait flag
-        String tag = Integer.toString(_nextTag++);
+        AMQShortString tag = new AMQShortString(Integer.toString(_nextTag++));
 
         FieldTable arguments = FieldTableFactory.newFieldTable();
         if (messageSelector != null && !messageSelector.equals(""))
         {
             arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector);
         }
-        if(consumer.isAutoClose())
+        if (consumer.isAutoClose())
         {
             arguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE);
         }
-        if(consumer.isNoConsume())
+        if (consumer.isNoConsume())
         {
             arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
         }
@@ -1253,29 +1244,17 @@
 
         try
         {
-            // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
             // Be aware of possible changes to parameter order as versions change.
-            /*AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId,
-                (byte)0, (byte)9,	// AMQP version (major, minor)
-                arguments,	// arguments
-                tag,	// consumerTag
-                consumer.isExclusive(),	// exclusive
-                consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,	// noAck
-                consumer.isNoLocal(),	// noLocal
-                nowait,	// nowait
-                queueName,	// queue
-                0);	// ticket */
-        	
             AMQMethodBody methodBody = MessageConsumeBody.createMethodBody(
-        			                (byte)0, (byte)9,	// AMQP version (major, minor)
-        							tag,	// consumerTag
-				        			consumer.isExclusive(),	// exclusive
-				        			arguments, // arguments in the form of a field table
-				                    consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,	// noAck
-				                    consumer.isNoLocal(),	// noLocal
-				                    queueName, // queue
-				                    0); // ticket */
+                getProtocolMajorVersion(),    // AMQP major version
+                getProtocolMinorVersion(),    // AMQP minor version
+        		tag,	// consumerTag
+				consumer.isExclusive(),	// exclusive
+				arguments, // arguments in the form of a field table
+				consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,	// noAck
+				consumer.isNoLocal(),	// noLocal
+				queueName, // queue
+				0); // ticket */
         	/*
             if (nowait)
             {
@@ -1364,7 +1343,7 @@
 
         if (topicName.indexOf('/') == -1)
         {
-            return new AMQTopic(topicName);
+            return new AMQTopic(new AMQShortString(topicName));
         }
         else
         {
@@ -1434,12 +1413,21 @@
         }
         else
         {
+            AMQShortString topicName;
+            if (topic instanceof AMQTopic)
+            {
+                topicName = ((AMQTopic) topic).getDestinationName();
+            }
+            else
+            {
+                topicName = new AMQShortString(topic.getTopicName());
+            }
             // if the queue is bound to the exchange but NOT for this topic, then the JMS spec
             // says we must trash the subscription.
-            if (isQueueBound(dest.getQueueName()) &&
-                !isQueueBound(dest.getQueueName(), topic.getTopicName()))
+            if (isQueueBound(dest.getAMQQueueName()) &&
+                !isQueueBound(dest.getAMQQueueName(), topicName))
             {
-                deleteQueue(dest.getQueueName());
+                deleteQueue(dest.getAMQQueueName());
             }
         }
 
@@ -1451,15 +1439,14 @@
         return subscriber;
     }
 
-    void deleteQueue(String queueName) throws JMSException
+    void deleteQueue(AMQShortString queueName) throws JMSException
     {
         try
         {
-            // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
             // Be aware of possible changes to parameter order as versions change.
             AMQMethodBody methodBody = QueueDeleteBody.createMethodBody(
-                (byte)0, (byte)9,	// AMQP version (major, minor)
+                getProtocolMajorVersion(),    // AMQP major version
+                getProtocolMinorVersion(),    // AMQP minor version
                 false,	// ifEmpty
                 false,	// ifUnused
                 true,	// nowait
@@ -1504,7 +1491,7 @@
     {
         checkNotClosed();
         checkValidQueue(queue);
-        return new AMQQueueBrowser(this, (AMQQueue) queue,messageSelector);
+        return new AMQQueueBrowser(this, (AMQQueue) queue, messageSelector);
     }
 
     public TemporaryQueue createTemporaryQueue() throws JMSException
@@ -1543,18 +1530,17 @@
         }
     }
 
-    boolean isQueueBound(String queueName) throws JMSException
+    boolean isQueueBound(AMQShortString queueName) throws JMSException
     {
         return isQueueBound(queueName, null);
     }
 
-    boolean isQueueBound(String queueName, String routingKey) throws JMSException
+    boolean isQueueBound(AMQShortString queueName, AMQShortString routingKey) throws JMSException
     {
-        // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
         // Be aware of possible changes to parameter order as versions change.
         AMQMethodBody methodBody = ExchangeBoundBody.createMethodBody(
-            (byte)0, (byte)9,	// AMQP version (major, minor)
+            getProtocolMajorVersion(),    // AMQP major version
+            getProtocolMinorVersion(),    // AMQP minor version
             ExchangeDefaults.TOPIC_EXCHANGE_NAME,	// exchange
             queueName,	// queue
             routingKey);	// routingKey
@@ -1568,7 +1554,7 @@
             throw new JMSAMQException(e);
         }
         ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod();
-        return (responseBody.replyCode == ExchangeBoundHandler.OK);
+        return (responseBody.replyCode == 0); //ExchangeBoundHandler.OK); Remove Broker compile dependency
     }
 
     private void checkTransacted() throws JMSException
@@ -1600,7 +1586,7 @@
             _logger.debug("Message received in session with channel id " + _channelId);
         }
 
-        _unacknowledged.offer(message.deliveryTag);
+        _unacknowledged.offer(message.getDeliveryTag());
         _queue.add(message);
     }
 
@@ -1616,10 +1602,8 @@
      */
     public synchronized void acknowledgeMessage(long requestId, boolean multiple) throws AMQException
     {
-        // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
         // Be aware of possible changes to parameter order as versions change.
-        final AMQMethodBody methodBody = MessageOkBody.createMethodBody((byte)0, (byte)9);	// AMQP version (major, minor)
+        final AMQMethodBody methodBody = MessageOkBody.createMethodBody(getProtocolMajorVersion(), getProtocolMinorVersion());	// AMQP version (major, minor)
         if (_logger.isDebugEnabled())
         {
             _logger.debug("Sending ack for request ID " + requestId + " on channel " + _channelId);
@@ -1659,7 +1643,7 @@
 
     void start()
     {
-        if (_dispatcher != null)
+        if (_startedAtLeastOnce.getAndSet(true))
         {
         	try{
             	//then we stopped this and are restarting, so signal server to resume delivery
@@ -1668,9 +1652,31 @@
 	        	_logger.error("Error Un Suspending Channel", e);
 	        }
         }
-        _dispatcher = new Dispatcher();
-        _dispatcher.setDaemon(true);
-        _dispatcher.start();
+
+        if(hasMessageListeners() && _dispatcher == null)
+        {
+            startDistpatcherIfNecessary();
+        }
+    }
+
+    private boolean hasMessageListeners()
+    {
+        return _hasMessageListeners;
+    }
+
+    void setHasMessageListeners()
+    {
+        _hasMessageListeners = true;
+    }
+
+    synchronized void startDistpatcherIfNecessary()
+    {
+        if(_dispatcher == null)
+        {
+            _dispatcher = new Dispatcher();
+            _dispatcher.setDaemon(true);
+            _dispatcher.start();
+        }
     }
 
     void stop()
@@ -1682,7 +1688,7 @@
         	_logger.error("Error Suspending Channel", e);
         }
         
-//stop the dispatcher thread
+        //stop the dispatcher thread
         _stopped.set(true);
     }
 
@@ -1701,11 +1707,11 @@
     {
         AMQDestination amqd = consumer.getDestination();
 
-        AMQProtocolHandler protocolHandler = _connection.getProtocolHandler();
+        AMQProtocolHandler protocolHandler = getProtocolHandler();
 
         declareExchange(amqd, protocolHandler);
 
-        String queueName = declareQueue(amqd, protocolHandler);
+        AMQShortString queueName = declareQueue(amqd, protocolHandler);
 
         bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
 
@@ -1727,19 +1733,21 @@
      */
     void deregisterConsumer(BasicMessageConsumer consumer)
     {
-        _consumers.remove(consumer.getConsumerTag());
-        String subscriptionName = _reverseSubscriptionMap.remove(consumer);
-        if (subscriptionName != null)
+        if (_consumers.remove(consumer.getConsumerTag()) != null)
         {
-            _subscriptions.remove(subscriptionName);
-        }
+            String subscriptionName = _reverseSubscriptionMap.remove(consumer);
+            if (subscriptionName != null)
+            {
+                _subscriptions.remove(subscriptionName);
+            }
 
-        Destination dest = consumer.getDestination();
-        synchronized(dest)
-        {
-            if (_destinationConsumerCount.get(dest).decrementAndGet() == 0)
+            Destination dest = consumer.getDestination();
+            synchronized (dest)
             {
-                _destinationConsumerCount.remove(dest);
+                if (_destinationConsumerCount.get(dest).decrementAndGet() == 0)
+                {
+                    _destinationConsumerCount.remove(dest);
+                }
             }
         }
     }
@@ -1773,7 +1781,7 @@
     private void resubscribeProducers() throws AMQException
     {
         ArrayList producers = new ArrayList(_producers.values());
-        _logger.info(MessageFormat.format("Resubscribing producers = {0} producers.size={1}", producers, producers.size())); // FIXME: remove
+        _logger.info(MessageFormat.format("Resubscribing producers = {0} producers.size={1}", producers, producers.size())); // FIXME: removeKey
         for (Iterator it = producers.iterator(); it.hasNext();)
         {
             BasicMessageProducer producer = (BasicMessageProducer) it.next();
@@ -1796,11 +1804,10 @@
     private void suspendChannel() throws AMQException
     {
         _logger.warn("Suspending channel");
-        // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
         // Be aware of possible changes to parameter order as versions change.
         AMQMethodBody methodBody = ChannelFlowBody.createMethodBody(
-            (byte)0, (byte)9,	// AMQP version (major, minor)
+            getProtocolMajorVersion(),    // AMQP major version
+            getProtocolMinorVersion(),    // AMQP minor version
             false);	// active
         _connection.getProtocolHandler().writeRequest(_channelId, methodBody);
     }
@@ -1808,19 +1815,18 @@
     private void unsuspendChannel() throws AMQException
     {
         _logger.warn("Unsuspending channel");
-        // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
         // Be aware of possible changes to parameter order as versions change.
         AMQMethodBody methodBody = ChannelFlowBody.createMethodBody(
-            (byte)0, (byte)9,	// AMQP version (major, minor)
+            getProtocolMajorVersion(),    // AMQP major version
+            getProtocolMinorVersion(),    // AMQP minor version
             true);	// active
         _connection.getProtocolHandler().writeRequest(_channelId, methodBody);
     }
 
-    public void confirmConsumerCancelled(String consumerTag)
+    public void confirmConsumerCancelled(AMQShortString consumerTag)
     {
         BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag);
-        if((consumer != null) && (consumer.isAutoClose()))
+        if ((consumer != null) && (consumer.isAutoClose()))
         {
             consumer.closeWhenNoMessages(true);
         }

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java Wed Feb 14 12:02:03 2007
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.client;
 
+import org.apache.qpid.framing.AMQShortString;
+
 import javax.jms.JMSException;
 import javax.jms.TemporaryQueue;
 
@@ -38,7 +40,7 @@
      */
     public AMQTemporaryQueue(AMQSession session)
     {
-        super("TempQueue" + Long.toString(System.currentTimeMillis()), true);
+        super(new AMQShortString("TempQueue" + Long.toString(System.currentTimeMillis())), true);
         _session = session;
     }
 

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java Wed Feb 14 12:02:03 2007
@@ -22,6 +22,7 @@
 
 import org.apache.qpid.url.BindingURL;
 import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
 
 import javax.jms.JMSException;
 import javax.jms.Topic;
@@ -40,10 +41,21 @@
 
     public AMQTopic(String name)
     {
+        this(new AMQShortString(name));
+    }
+
+    public AMQTopic(AMQShortString exchange, AMQShortString routingKey, AMQShortString queueName)
+    {
+        super(exchange, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, routingKey, true, true, queueName, false);
+    }
+
+
+    public AMQTopic(AMQShortString name)
+    {
         this(name, true, null, false);
     }
 
-    public AMQTopic(String name, boolean isAutoDelete, String queueName, boolean isDurable)
+    public AMQTopic(AMQShortString name, boolean isAutoDelete, AMQShortString queueName, boolean isDurable)
     {
         super(ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, name, true, isAutoDelete,
               queueName, isDurable);
@@ -56,17 +68,17 @@
                             true);
     }
 
-    public static String getDurableTopicQueueName(String subscriptionName, AMQConnection connection) throws JMSException
+    public static AMQShortString getDurableTopicQueueName(String subscriptionName, AMQConnection connection) throws JMSException
     {
-        return connection.getClientID() + ":" + subscriptionName;
+        return new AMQShortString(connection.getClientID() + ":" + subscriptionName);
     }
 
     public String getTopicName() throws JMSException
     {
-        return super.getDestinationName();
+        return super.getDestinationName().toString();
     }
 
-     public String getRoutingKey()
+     public AMQShortString getRoutingKey()
     {
         return getDestinationName();
     }