You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/01/16 13:13:28 UTC

svn commit: r496666 [2/3] - in /incubator/qpid/branches/perftesting/qpid: java/broker/ java/broker/src/main/grammar/ java/broker/src/main/java/org/apache/qpid/server/ java/broker/src/main/java/org/apache/qpid/server/filter/ java/broker/src/main/java/or...

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Tue Jan 16 04:13:19 2007
@@ -23,12 +23,21 @@
 import org.apache.log4j.Logger;
 import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.common.ClientProperties;
+import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
 import org.apache.qpid.framing.AMQDataBlock;
 import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.BasicDeliverBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.BasicCancelOkBody;
 import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.filter.FilterManagerFactory;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 
+import java.util.Queue;
+
 /**
  * Encapsulation of a supscription to a queue.
  * <p/>
@@ -48,23 +57,30 @@
 
     private final Object sessionKey;
 
+    private Queue<AMQMessage> _messages;
+
+    private final boolean _noLocal;
+
     /**
      * True if messages need to be acknowledged
      */
     private final boolean _acks;
+    private FilterManager _filters;
+    private final boolean _isBrowser;
+    private final Boolean _autoClose;
+    private boolean _closed = false;
 
     public static class Factory implements SubscriptionFactory
     {
-        public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks)
-                throws AMQException
+        public Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, FieldTable filters, boolean noLocal) throws AMQException
         {
-            return new SubscriptionImpl(channel, protocolSession, consumerTag, acks);
+            return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters, noLocal);
         }
 
         public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag)
                 throws AMQException
         {
-            return new SubscriptionImpl(channel, protocolSession, consumerTag);
+            return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null, false);
         }
     }
 
@@ -72,6 +88,13 @@
                             String consumerTag, boolean acks)
             throws AMQException
     {
+        this(channelId, protocolSession, consumerTag, acks, null, false);
+    }
+
+    public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession,
+                            String consumerTag, boolean acks, FieldTable filters, boolean noLocal)
+            throws AMQException
+    {
         AMQChannel channel = protocolSession.getChannel(channelId);
         if (channel == null)
         {
@@ -83,8 +106,61 @@
         this.consumerTag = consumerTag;
         sessionKey = protocolSession.getKey();
         _acks = acks;
+        _noLocal = noLocal;
+
+        _filters = FilterManagerFactory.createManager(filters);
+
+
+        if (_filters != null)
+        {
+            Object isBrowser = filters.get(AMQPFilterTypes.NO_CONSUME.getValue());
+            if (isBrowser != null)
+            {
+                _isBrowser = (Boolean) isBrowser;
+            }
+            else
+            {
+                _isBrowser = false;
+            }
+        }
+        else
+        {
+            _isBrowser = false;
+        }
+
+
+        if (_filters != null)
+        {
+            Object autoClose = filters.get(AMQPFilterTypes.AUTO_CLOSE.getValue());
+            if (autoClose != null)
+            {
+                _autoClose = (Boolean) autoClose;
+            }
+            else
+            {
+                _autoClose = false;
+            }
+        }
+        else
+        {
+            _autoClose = false;
+        }
+
+
+        if (_filters != null)
+        {
+            _messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>();
+
+
+        }
+        else
+        {
+            // Reference the DeliveryManager
+            _messages = null;
+        }
     }
 
+
     public SubscriptionImpl(int channel, AMQProtocolSession protocolSession,
                             String consumerTag)
             throws AMQException
@@ -129,9 +205,50 @@
     {
         if (msg != null)
         {
+            if (_isBrowser)
+            {
+                sendToBrowser(msg, queue);
+            }
+            else
+            {
+                sendToConsumer(msg, queue);
+            }
+        }
+        else
+        {
+            _logger.error("Attempt to send Null message", new NullPointerException());
+        }
+    }
+
+    private void sendToBrowser(AMQMessage msg, AMQQueue queue) throws FailedDequeueException
+    {
+        // We don't decrement the reference here as we don't want to consume the message
+        // but we do want to send it to the client.
+
+        synchronized(channel)
+        {
+            long deliveryTag = channel.getNextDeliveryTag();
+
+            // We don't need to add the message to the unacknowledgedMap as we don't need to know if the client
+            // received the message. If it is lost in transit that is not important.
+            if (_acks)
+            {
+                channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue);
+            }
+            ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
+            AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
+
+            protocolSession.writeFrame(frame);
+        }
+    }
+
+    private void sendToConsumer(AMQMessage msg, AMQQueue queue) throws FailedDequeueException
+    {
+        try
+        {
             // if we do not need to wait for client acknowledgements
-            // we can decrement the reference count immediately. 
-            
+            // we can decrement the reference count immediately.
+
             // By doing this _before_ the send we ensure that it
             // doesn't get sent if it can't be dequeued, preventing
             // duplicate delivery on recovery.
@@ -157,9 +274,9 @@
                 protocolSession.writeFrame(frame);
             }
         }
-        else
+        finally
         {
-            _logger.error("Attempt to send Null message", new NullPointerException());
+            msg.setDeliveredToConsumer();
         }
     }
 
@@ -177,6 +294,101 @@
     {
         channel.queueDeleted(queue);
     }
+
+    public boolean hasFilters()
+    {
+        return _filters != null;
+    }
+
+    public boolean hasInterest(AMQMessage msg)
+    {
+        if (_noLocal)
+        {
+            // We don't want local messages so check to see if message is one we sent
+            if (protocolSession.getClientProperties().get(ClientProperties.instance.toString()).equals(
+                    msg.getPublisher().getClientProperties().get(ClientProperties.instance.toString())))
+            {
+                if (_logger.isTraceEnabled())
+                {
+                    _logger.trace("(" + System.identityHashCode(this) + ") has no interest as it is a local message(" +
+                                  System.identityHashCode(msg) + ")");
+                }
+                return false;
+            }
+            else // if not then filter the message.
+            {
+                if (_logger.isTraceEnabled())
+                {
+                    _logger.trace("(" + System.identityHashCode(this) + ") local message(" + System.identityHashCode(msg) +
+                                  ") but not ours so filtering");
+                }
+                return checkFilters(msg);
+            }
+        }
+        else
+        {
+            if (_logger.isTraceEnabled())
+            {
+                _logger.trace("(" + System.identityHashCode(this) + ") checking filters for message (" + System.identityHashCode(msg));
+            }
+            return checkFilters(msg);
+        }
+    }
+
+    private boolean checkFilters(AMQMessage msg)
+    {
+        if (_filters != null)
+        {
+            if (_logger.isTraceEnabled())
+            {
+                _logger.trace("(" + System.identityHashCode(this) + ") has filters.");
+            }
+            return _filters.allAllow(msg);
+        }
+        else
+        {
+            if (_logger.isTraceEnabled())
+            {
+                _logger.trace("(" + System.identityHashCode(this) + ") has no filters");
+            }
+
+            return true;
+        }
+    }
+
+    public Queue<AMQMessage> getPreDeliveryQueue()
+    {
+        return _messages;
+    }
+
+    public void enqueueForPreDelivery(AMQMessage msg)
+    {
+        if (_messages != null)
+        {
+            _messages.offer(msg);
+        }
+    }
+
+    public boolean isAutoClose()
+    {
+        return _autoClose;
+    }
+
+    public void close()
+    {
+        if (!_closed)
+        {
+            _logger.info("Closing autoclose subscription:" + this);
+            protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(), consumerTag));
+            _closed = true;
+        }
+    }
+
+    public boolean isBrowser()
+    {
+        return _isBrowser;
+    }
+
 
     private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange)
     {

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java Tue Jan 16 04:13:19 2007
@@ -20,12 +20,15 @@
  */
 package org.apache.qpid.server.queue;
 
+import java.util.List;
+
 /**
  * Abstraction of actor that will determine the subscriber to whom
  * a message will be sent.
  */
 public interface SubscriptionManager
 {
+    public List<Subscription> getSubscriptions();
     public boolean hasActiveSubscribers();
     public Subscription nextSubscriber(AMQMessage msg);
 }

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java Tue Jan 16 04:13:19 2007
@@ -21,6 +21,8 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+
 import java.util.List;
 import java.util.ListIterator;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -58,6 +60,7 @@
 
     /**
      * Remove the subscription, returning it if it was found
+     *
      * @param subscription
      * @return null if no match was found
      */
@@ -90,7 +93,7 @@
 
     /**
      * Return the next unsuspended subscription or null if not found.
-     *
+     * <p/>
      * Performance note:
      * This method can scan all items twice when looking for a subscription that is not
      * suspended. The worst case occcurs when all subscriptions are suspended. However, it is does this
@@ -105,31 +108,51 @@
             return null;
         }
 
-        try {
-            final Subscription result = nextSubscriber();
-            if (result == null) {
+        try
+        {
+            final Subscription result = nextSubscriberImpl(msg);
+            if (result == null)
+            {
                 _currentSubscriber = 0;
-                return nextSubscriber();
-            } else {
+                return nextSubscriberImpl(msg);
+            }
+            else
+            {
                 return result;
             }
-        } catch (IndexOutOfBoundsException e) {
+        }
+        catch (IndexOutOfBoundsException e)
+        {
             _currentSubscriber = 0;
-            return nextSubscriber();
+            return nextSubscriber(msg);
         }
     }
 
-    private Subscription nextSubscriber()
+    private Subscription nextSubscriberImpl(AMQMessage msg)
     {
         final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber);
-        while (iterator.hasNext()) {
+        while (iterator.hasNext())
+        {
             Subscription subscription = iterator.next();
             ++_currentSubscriber;
             subscriberScanned();
-            if (!subscription.isSuspended()) {
-                return subscription;
+
+            if (!subscription.isSuspended())
+            {
+                if (subscription.hasInterest(msg))
+                {
+                    // if the queue is not empty then this client is ready to receive a message.
+                    //FIXME the queue could be full of sent messages.
+                    // Either need to clean all PDQs after sending a message
+                    // OR have a clean up thread that runs the PDQs expunging the messages.
+                    if (!subscription.hasFilters() || subscription.getPreDeliveryQueue().isEmpty())
+                    {
+                        return subscription;
+                    }
+                }
             }
         }
+
         return null;
     }
 
@@ -145,11 +168,19 @@
         return _subscriptions.isEmpty();
     }
 
+    public List<Subscription> getSubscriptions()
+    {
+        return _subscriptions;
+    }
+
     public boolean hasActiveSubscribers()
     {
         for (Subscription s : _subscriptions)
         {
-            if (!s.isSuspended()) return true;
+            if (!s.isSuspended())
+            {
+                return true;
+            }
         }
         return false;
     }
@@ -159,7 +190,10 @@
         int count = 0;
         for (Subscription s : _subscriptions)
         {
-            if (!s.isSuspended()) count++;
+            if (!s.isSuspended())
+            {
+                count++;
+            }
         }
         return count;
     }
@@ -167,6 +201,7 @@
     /**
      * Notification that a queue has been deleted. This is called so that the subscription can inform the
      * channel, which in turn can update its list of unacknowledged messages.
+     *
      * @param queue
      */
     public void queueDeleted(AMQQueue queue)
@@ -177,7 +212,8 @@
         }
     }
 
-    int size() {
+    int size()
+    {
         return _subscriptions.size();
     }
 }

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java Tue Jan 16 04:13:19 2007
@@ -35,7 +35,7 @@
  */
 class SynchronizedDeliveryManager implements DeliveryManager
 {
-    private static final Logger _log = Logger.getLogger(ConcurrentDeliveryManager.class);
+    private static final Logger _log = Logger.getLogger(SynchronizedDeliveryManager.class);
 
     /**
      * Holds any queued messages
@@ -122,6 +122,11 @@
         return new ArrayList<AMQMessage>(_messages);
     }
 
+    public void populatePreDeliveryQueue(Subscription subscription)
+    {
+        //no-op . This DM has no PreDeliveryQueues
+    }
+
     public synchronized void removeAMessageFromTop() throws AMQException
     {
         AMQMessage msg = poll();
@@ -243,7 +248,6 @@
                 else
                 {
                     s.send(msg, _queue);
-                    msg.setDeliveredToConsumer();
                 }
             }
         }

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/plain/PlainSaslServer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/plain/PlainSaslServer.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/plain/PlainSaslServer.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/plain/PlainSaslServer.java Tue Jan 16 04:13:19 2007
@@ -62,7 +62,7 @@
             }
 
             // we do not currently support authcid in any meaningful way
-            String authcid = new String(response, 0, authzidNullPosition, "utf8");
+            // String authcid = new String(response, 0, authzidNullPosition, "utf8");
             String authzid = new String(response, authzidNullPosition + 1, authcidNullPosition - 1, "utf8");
 
             // we do not care about the prompt but it throws if null

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/util/CircularBuffer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/util/CircularBuffer.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/util/CircularBuffer.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/util/CircularBuffer.java Tue Jan 16 04:13:19 2007
@@ -20,10 +20,15 @@
  */
 package org.apache.qpid.server.util;
 
+import org.apache.log4j.Logger;
+
 import java.util.Iterator;
 
 public class CircularBuffer implements Iterable
 {
+
+    private static final Logger _logger = Logger.getLogger(CircularBuffer.class);
+
     private final Object[] _log;
     private int _size;
     private int _index;
@@ -102,7 +107,7 @@
     {
         for(Object o : this)
         {
-         System.out.println(o);   
+         _logger.info(o);
         }
     }
 
@@ -120,7 +125,7 @@
         for(String s : items)
         {
             buffer.add(s);
-            System.out.println(buffer);
+            _logger.info(buffer);
         }
     }
 }

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Tue Jan 16 04:13:19 2007
@@ -139,6 +139,12 @@
     */
     private AMQException _lastAMQException = null;
 
+
+    /*
+     * The connection meta data
+     */
+    private QpidConnectionMetaData _connectionMetaData;
+
     public AMQConnection(String broker, String username, String password,
                          String clientName, String virtualHost) throws AMQException, URLSyntaxException
     {
@@ -281,6 +287,7 @@
 
             throw e;
         }
+        _connectionMetaData = new QpidConnectionMetaData(this);
     }
 
     protected boolean checkException(Throwable thrown)
@@ -550,7 +557,7 @@
     public ConnectionMetaData getMetaData() throws JMSException
     {
         checkNotClosed();
-        return QpidConnectionMetaData.instance();
+        return _connectionMetaData;
         
     }
 

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Jan 16 04:13:19 2007
@@ -23,13 +23,15 @@
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQUndeliveredException;
+import org.apache.qpid.AMQInvalidSelectorException;
+import org.apache.qpid.common.AMQPFilterTypes;
 import org.apache.qpid.client.failover.FailoverSupport;
 import org.apache.qpid.client.message.AbstractJMSMessage;
 import org.apache.qpid.client.message.JMSStreamMessage;
 import org.apache.qpid.client.message.MessageFactoryRegistry;
 import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.qpid.client.protocol.AMQMethodEvent;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.protocol.AMQMethodEvent;
 import org.apache.qpid.client.util.FlowControllingBlockingQueue;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.*;
@@ -69,15 +71,15 @@
     private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK;
 
     /**
-     *  Used to reference durable subscribers so they requests for unsubscribe can be handled
-     *  correctly.  Note this only keeps a record of subscriptions which have been created
-     *  in the current instance.  It does not remember subscriptions between executions of the
-     *  client
+     * Used to reference durable subscribers so they requests for unsubscribe can be handled
+     * correctly.  Note this only keeps a record of subscriptions which have been created
+     * in the current instance.  It does not remember subscriptions between executions of the
+     * client
      */
     private final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions =
             new ConcurrentHashMap<String, TopicSubscriberAdaptor>();
     private final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap =
-                new ConcurrentHashMap<BasicMessageConsumer, String>();
+            new ConcurrentHashMap<BasicMessageConsumer, String>();
 
     /**
      * Used in the consume method. We generate the consume tag on the client so that we can use the nowait
@@ -143,6 +145,7 @@
     private boolean _inRecovery;
 
 
+
     /**
      * Responsible for decoding a message fragment and passing it to the appropriate message consumer.
      */
@@ -176,7 +179,7 @@
         {
             if (message.deliverBody != null)
             {
-                final BasicMessageConsumer consumer = _consumers.get(message.deliverBody.consumerTag);
+                final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.deliverBody.consumerTag);
 
                 if (consumer == null)
                 {
@@ -210,17 +213,15 @@
                     {
                         _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage));
                     }
+                    else if (errorCode == AMQConstant.NO_ROUTE.getCode())
+                    {
+                        _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage));
+                    }
                     else
                     {
-                        if (errorCode == AMQConstant.NO_ROUTE.getCode())
-                        {
-                            _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage));
-                        }
-                        else
-                        {
-                            _connection.exceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage));
-                        }
+                        _connection.exceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage));
                     }
+
                 }
                 catch (Exception e)
                 {
@@ -318,7 +319,7 @@
 
     public BytesMessage createBytesMessage() throws JMSException
     {
-        synchronized (_connection.getFailoverMutex())
+        synchronized(_connection.getFailoverMutex())
         {
             checkNotClosed();
             try
@@ -334,7 +335,7 @@
 
     public MapMessage createMapMessage() throws JMSException
     {
-        synchronized (_connection.getFailoverMutex())
+        synchronized(_connection.getFailoverMutex())
         {
             checkNotClosed();
             try
@@ -350,7 +351,7 @@
 
     public javax.jms.Message createMessage() throws JMSException
     {
-        synchronized (_connection.getFailoverMutex())
+        synchronized(_connection.getFailoverMutex())
         {
             checkNotClosed();
             try
@@ -366,7 +367,7 @@
 
     public ObjectMessage createObjectMessage() throws JMSException
     {
-        synchronized (_connection.getFailoverMutex())
+        synchronized(_connection.getFailoverMutex())
         {
             checkNotClosed();
             try
@@ -382,7 +383,7 @@
 
     public ObjectMessage createObjectMessage(Serializable object) throws JMSException
     {
-        synchronized (_connection.getFailoverMutex())
+        synchronized(_connection.getFailoverMutex())
         {
             checkNotClosed();
             try
@@ -400,7 +401,7 @@
 
     public StreamMessage createStreamMessage() throws JMSException
     {
-        synchronized (_connection.getFailoverMutex())
+        synchronized(_connection.getFailoverMutex())
         {
             checkNotClosed();
 
@@ -417,7 +418,7 @@
 
     public TextMessage createTextMessage() throws JMSException
     {
-        synchronized (_connection.getFailoverMutex())
+        synchronized(_connection.getFailoverMutex())
         {
             checkNotClosed();
 
@@ -434,7 +435,7 @@
 
     public TextMessage createTextMessage(String text) throws JMSException
     {
-        synchronized (_connection.getFailoverMutex())
+        synchronized(_connection.getFailoverMutex())
         {
             checkNotClosed();
             try
@@ -504,7 +505,7 @@
     {
         // We must close down all producers and consumers in an orderly fashion. This is the only method
         // that can be called from a different thread of control from the one controlling the session
-        synchronized (_connection.getFailoverMutex())
+        synchronized(_connection.getFailoverMutex())
         {
             //Ensure we only try and close an open session.
             if (!_closed.getAndSet(true))
@@ -569,7 +570,7 @@
      */
     public void closed(Throwable e)
     {
-        synchronized (_connection.getFailoverMutex())
+        synchronized(_connection.getFailoverMutex())
         {
             // An AMQException has an error code and message already and will be passed in when closure occurs as a
             // result of a channel close request
@@ -721,11 +722,11 @@
 
     public void acknowledge() throws JMSException
     {
-        if(isClosed())
+        if (isClosed())
         {
             throw new IllegalStateException("Session is already closed");
         }
-        for(BasicMessageConsumer consumer : _consumers.values())
+        for (BasicMessageConsumer consumer : _consumers.values())
         {
             consumer.acknowledge();
         }
@@ -734,7 +735,6 @@
     }
 
 
-
     public MessageListener getMessageListener() throws JMSException
     {
         checkNotClosed();
@@ -843,7 +843,9 @@
                                   false,
                                   false,
                                   null,
-                                  null);
+                                  null,
+                                  false,
+                                  false);
     }
 
     public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
@@ -855,7 +857,9 @@
                                   false,
                                   false,
                                   messageSelector,
-                                  null);
+                                  null,
+                                  false,
+                                  false);
     }
 
     public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
@@ -868,7 +872,26 @@
                                   noLocal,
                                   false,
                                   messageSelector,
-                                  null);
+                                  null,
+                                  false,
+                                  false);
+    }
+
+    public MessageConsumer createBrowserConsumer(Destination destination,
+                                         String messageSelector,
+                                         boolean noLocal)
+            throws JMSException
+    {
+        checkValidDestination(destination);
+        return createConsumerImpl(destination,
+                                  _defaultPrefetchHighMark,
+                                  _defaultPrefetchLowMark,
+                                  noLocal,
+                                  false,
+                                  messageSelector,
+                                  null,
+                                  true,
+                                  true);
     }
 
     public MessageConsumer createConsumer(Destination destination,
@@ -878,7 +901,7 @@
                                           String selector) throws JMSException
     {
         checkValidDestination(destination);
-        return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, null);
+        return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, null, false, false);
     }
 
 
@@ -890,7 +913,7 @@
                                           String selector) throws JMSException
     {
         checkValidDestination(destination);
-        return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null);
+        return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, false, false);
     }
 
     public MessageConsumer createConsumer(Destination destination,
@@ -902,7 +925,7 @@
     {
         checkValidDestination(destination);
         return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive,
-                                  selector, rawSelector);
+                                  selector, rawSelector, false, false);
     }
 
     public MessageConsumer createConsumer(Destination destination,
@@ -915,7 +938,7 @@
     {
         checkValidDestination(destination);
         return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive,
-                                  selector, rawSelector);
+                                  selector, rawSelector, false, false);
     }
 
     protected MessageConsumer createConsumerImpl(final Destination destination,
@@ -924,7 +947,9 @@
                                                  final boolean noLocal,
                                                  final boolean exclusive,
                                                  final String selector,
-                                                 final FieldTable rawSelector) throws JMSException
+                                                 final FieldTable rawSelector,
+                                                 final boolean noConsume,
+                                                 final boolean autoClose) throws JMSException
     {
         checkTemporaryDestination(destination);
 
@@ -948,12 +973,18 @@
                 BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, _connection, amqd, selector, noLocal,
                                                                          _messageFactoryRegistry, AMQSession.this,
                                                                          protocolHandler, ft, prefetchHigh, prefetchLow, exclusive,
-                                                                         _acknowledgeMode);
+                                                                         _acknowledgeMode, noConsume, autoClose);
 
                 try
                 {
                     registerConsumer(consumer, false);
                 }
+                catch (AMQInvalidSelectorException ise)
+                {
+                    JMSException ex = new InvalidSelectorException(ise.getMessage());
+                    ex.setLinkedException(ise);
+                    throw ex;
+                }
                 catch (AMQException e)
                 {
                     JMSException ex = new JMSException("Error registering consumer: " + e);
@@ -963,7 +994,7 @@
 
                 synchronized(destination)
                 {
-                    _destinationConsumerCount.putIfAbsent(destination,new AtomicInteger());
+                    _destinationConsumerCount.putIfAbsent(destination, new AtomicInteger());
                     _destinationConsumerCount.get(destination).incrementAndGet();
                 }
 
@@ -975,16 +1006,16 @@
     private void checkTemporaryDestination(Destination destination)
             throws JMSException
     {
-        if((destination instanceof TemporaryDestination))
+        if ((destination instanceof TemporaryDestination))
         {
             _logger.debug("destination is temporary");
             final TemporaryDestination tempDest = (TemporaryDestination) destination;
-            if(tempDest.getSession() != this)
+            if (tempDest.getSession() != this)
             {
                 _logger.debug("destination is on different session");
                 throw new JMSException("Cannot consume from a temporary destination created onanother session");
             }
-            if(tempDest.isDeleted())
+            if (tempDest.isDeleted())
             {
                 _logger.debug("destination is deleted");
                 throw new JMSException("Cannot consume from a deleted destination");
@@ -1065,12 +1096,26 @@
      * @return the consumer tag generated by the broker
      */
     private void consumeFromQueue(BasicMessageConsumer consumer, String queueName, AMQProtocolHandler protocolHandler,
-                                  boolean nowait) throws AMQException
+                                  boolean nowait, String messageSelector) throws AMQException
     {
         //fixme prefetch values are not used here. Do we need to have them as parametsrs?
         //need to generate a consumer tag on the client so we can exploit the nowait flag
         String tag = Integer.toString(_nextTag++);
 
+        FieldTable arguments = FieldTableFactory.newFieldTable();
+        if (messageSelector != null && !messageSelector.equals(""))
+        {
+            arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector);
+        }
+        if(consumer.isAutoClose())
+        {
+            arguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE);
+        }
+        if(consumer.isNoConsume())
+        {
+            arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
+        }
+
         consumer.setConsumerTag(tag);
         // we must register the consumer in the map before we actually start listening
         _consumers.put(tag, consumer);
@@ -1080,7 +1125,7 @@
             AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, 0,
                                                                   queueName, tag, consumer.isNoLocal(),
                                                                   consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,
-                                                                  consumer.isExclusive(), nowait);
+                                                                  consumer.isExclusive(), nowait, arguments);
             if (nowait)
             {
                 protocolHandler.writeFrame(jmsConsume);
@@ -1220,7 +1265,7 @@
     {
         checkNotClosed();
         checkValidTopic(topic);
-        AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic)topic, name, _connection);
+        AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection);
         TopicSubscriberAdaptor subscriber = _subscriptions.get(name);
         if (subscriber != null)
         {
@@ -1247,8 +1292,8 @@
 
         subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
 
-        _subscriptions.put(name,subscriber);
-        _reverseSubscriptionMap.put(subscriber.getMessageConsumer(),name);
+        _subscriptions.put(name, subscriber);
+        _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
 
         return subscriber;
     }
@@ -1278,8 +1323,8 @@
         AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection);
         BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal);
         TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer);
-        _subscriptions.put(name,subscriber);
-        _reverseSubscriptionMap.put(subscriber.getMessageConsumer(),name);
+        _subscriptions.put(name, subscriber);
+        _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
         return subscriber;
     }
 
@@ -1291,16 +1336,14 @@
 
     public QueueBrowser createBrowser(Queue queue) throws JMSException
     {
-        checkNotClosed();
-        checkValidQueue(queue);
-        throw new UnsupportedOperationException("Queue browsing not supported");
+        return createBrowser(queue, null);
     }
 
     public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException
     {
         checkNotClosed();
         checkValidQueue(queue);
-        throw new UnsupportedOperationException("Queue browsing not supported");
+        return new AMQQueueBrowser(this, (AMQQueue) queue,messageSelector);
     }
 
     public TemporaryQueue createTemporaryQueue() throws JMSException
@@ -1476,7 +1519,14 @@
 
         bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
 
-        consumeFromQueue(consumer, queueName, protocolHandler, nowait);
+        try
+        {
+            consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer.getMessageSelector());
+        }
+        catch (JMSException e) //thrown by getMessageSelector
+        {
+            throw new AMQException(e.getMessage(), e);
+        }
     }
 
     /**
@@ -1489,7 +1539,7 @@
     {
         _consumers.remove(consumer.getConsumerTag());
         String subscriptionName = _reverseSubscriptionMap.remove(consumer);
-        if(subscriptionName != null)
+        if (subscriptionName != null)
         {
             _subscriptions.remove(subscriptionName);
         }
@@ -1497,7 +1547,7 @@
         Destination dest = consumer.getDestination();
         synchronized(dest)
         {
-            if(_destinationConsumerCount.get(dest).decrementAndGet() == 0)
+            if (_destinationConsumerCount.get(dest).decrementAndGet() == 0)
             {
                 _destinationConsumerCount.remove(dest);
             }
@@ -1567,6 +1617,16 @@
         _connection.getProtocolHandler().writeFrame(channelFlowFrame);
     }
 
+    public void confirmConsumerCancelled(String consumerTag)
+    {
+        BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag);
+        if((consumer != null) && (consumer.isAutoClose()))
+        {
+            consumer.closeWhenNoMessages(true);
+        }
+    }
+
+
     /*
      * I could have combined the last 3 methods, but this way it improves readability
      */
@@ -1576,7 +1636,7 @@
         {
             throw new javax.jms.InvalidDestinationException("Invalid Topic");
         }
-        if((topic instanceof TemporaryDestination) && ((TemporaryDestination)topic).getSession() != this)
+        if ((topic instanceof TemporaryDestination) && ((TemporaryDestination) topic).getSession() != this)
         {
             throw new JMSException("Cannot create a subscription on a temporary topic created in another session");
         }
@@ -1597,4 +1657,5 @@
             throw new javax.jms.InvalidDestinationException("Invalid Queue");
         }
     }
+
 }

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Tue Jan 16 04:13:19 2007
@@ -22,6 +22,8 @@
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.url.AMQBindingURL;
+import org.apache.qpid.url.URLSyntaxException;
 import org.apache.qpid.client.message.AbstractJMSMessage;
 import org.apache.qpid.client.message.MessageFactoryRegistry;
 import org.apache.qpid.client.message.UnprocessedMessage;
@@ -39,6 +41,7 @@
 import java.util.Iterator;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import javax.jms.Destination;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -142,10 +145,19 @@
      */
     private Thread _receivingThread;
 
+    /**
+     *  autoClose denotes that the consumer will automatically cancel itself when there are no more messages to receive
+     *  on the queue.  This is used for queue browsing.
+     */
+    private boolean _autoClose;
+    private boolean _closeWhenNoMessages;
+
+    private boolean _noConsume;
+
     protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector,
-                         boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
-                         AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable,
-                         int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode)
+                                   boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
+                                   AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable,
+                                   int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
     {
         _channelId = channelId;
         _connection = connection;
@@ -161,6 +173,8 @@
         _exclusive = exclusive;
         _acknowledgeMode = acknowledgeMode;
         _synchronousQueue = new ArrayBlockingQueue(prefetchHigh, true);
+        _autoClose = autoClose;
+        _noConsume = noConsume;
     }
 
     public AMQDestination getDestination()
@@ -241,6 +255,17 @@
         if(_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
         {
             _unacknowledgedDeliveryTags.add(jmsMsg.getDeliveryTag());
+            String url = jmsMsg.getStringProperty(CustomJMXProperty.JMSX_QPID_JMSDESTINATIONURL.toString());
+            try
+            {
+                Destination dest = AMQDestination.createDestination(new AMQBindingURL(url));
+                jmsMsg.setJMSDestination(dest);
+            }
+            catch (URLSyntaxException e)
+            {
+                _logger.warn("Unable to parse the supplied destination header: " + url);
+            }
+                        
         }
         _session.setInRecovery(false);
     }
@@ -307,6 +332,10 @@
 
         try
         {
+            if(closeOnAutoClose())
+            {
+                return null;
+            }
             Object o = null;
             if (l > 0)
             {
@@ -336,6 +365,19 @@
         }
     }
 
+    private boolean closeOnAutoClose() throws JMSException
+    {
+        if(isAutoClose() && _closeWhenNoMessages && _synchronousQueue.isEmpty())
+        {
+            close(false);
+            return true;
+        }
+        else
+        {
+            return false;
+        }
+    }
+
     public Message receiveNoWait() throws JMSException
     {
     	checkPreConditions();
@@ -344,6 +386,10 @@
 
         try
         {
+            if(closeOnAutoClose())
+            {
+                return null;
+            }
             Object o = _synchronousQueue.poll();
             final AbstractJMSMessage m = returnMessageOrThrow(o);
             if (m != null)
@@ -388,22 +434,31 @@
         }
     }
 
+
     public void close() throws JMSException
     {
+        close(true);
+    }
+
+    public void close(boolean sendClose) throws JMSException
+    {
         synchronized(_connection.getFailoverMutex())
         {
             if (!_closed.getAndSet(true))
             {
-                final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId, _consumerTag, false);
-
-                try
+                if(sendClose)
                 {
-                    _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class);
-                }
-                catch (AMQException e)
-                {
-                    _logger.error("Error closing consumer: " + e, e);
-                    throw new JMSException("Error closing consumer: " + e);
+                    final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId, _consumerTag, false);
+
+                    try
+                    {
+                        _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class);
+                    }
+                    catch (AMQException e)
+                    {
+                        _logger.error("Error closing consumer: " + e, e);
+                        throw new JMSException("Error closing consumer: " + e);
+                    }
                 }
 
                 deregisterConsumer();
@@ -499,6 +554,12 @@
     	msg.setJMSDestination(_destination);
         switch (_acknowledgeMode)
         {
+            case Session.CLIENT_ACKNOWLEDGE:
+                if (isNoConsume())
+                {
+                    _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+                }
+                break;
             case Session.DUPS_OK_ACKNOWLEDGE:
                 if (++_outstanding >= _prefetchHigh)
                 {
@@ -525,7 +586,14 @@
                 }
                 break;
             case Session.SESSION_TRANSACTED:
-                _lastDeliveryTag = msg.getDeliveryTag();
+                if (isNoConsume())
+                {
+                    _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+                }
+                else
+                {
+                    _lastDeliveryTag = msg.getDeliveryTag();
+                }
                 break;
         }
     }
@@ -615,5 +683,30 @@
     public void clearUnackedMessages()
     {
         _unacknowledgedDeliveryTags.clear();
+    }
+
+    public boolean isAutoClose()
+    {
+        return _autoClose;
+    }
+
+
+    public boolean isNoConsume()
+    {
+        return _noConsume;
+    }
+
+    public void closeWhenNoMessages(boolean b)
+    {
+        _closeWhenNoMessages = b;
+
+        if(_closeWhenNoMessages
+                && _synchronousQueue.isEmpty() 
+                && _receiving.get()
+                && _messageListener != null)
+        {
+            _receivingThread.interrupt();
+        }
+
     }
 }

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Tue Jan 16 04:13:19 2007
@@ -507,8 +507,11 @@
                             long timeToLive, boolean mandatory, boolean immediate, boolean wait) throws JMSException
     {
         checkTemporaryDestination(destination);
+        origMessage.setJMSDestination(destination);
 
+        
         AbstractJMSMessage message = convertToNativeMessage(origMessage);
+        message.getJmsContentHeaderProperties().getJMSHeaders().setString(CustomJMXProperty.JMSX_QPID_JMSDESTINATIONURL.toString(), destination.toURL());
         AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId, 0, destination.getExchangeName(),
                                                                 destination.getRoutingKey(), mandatory, immediate);
 

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java Tue Jan 16 04:13:19 2007
@@ -1,50 +1,97 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
 package org.apache.qpid.client;
 
+import org.apache.qpid.common.QpidProperties;
+
 import java.util.Enumeration;
 
 import javax.jms.ConnectionMetaData;
 import javax.jms.JMSException;
 
-public class QpidConnectionMetaData implements ConnectionMetaData {
+public class QpidConnectionMetaData implements ConnectionMetaData
+{
+
+
+    QpidConnectionMetaData(AMQConnection conn)
+    {
+    }
+
+    public int getJMSMajorVersion() throws JMSException
+    {
+        return 1;
+    }
+
+    public int getJMSMinorVersion() throws JMSException
+    {
+        return 1;
+    }
+
+    public String getJMSProviderName() throws JMSException
+    {
+        return "Apache " + QpidProperties.getProductName();
+    }
+
+    public String getJMSVersion() throws JMSException
+    {
+        return "1.1";
+    }
+
+    public Enumeration getJMSXPropertyNames() throws JMSException
+    {
+        return CustomJMXProperty.asEnumeration();
+    }
+
+    public int getProviderMajorVersion() throws JMSException
+    {
+        return 0;
+    }
+
+    public int getProviderMinorVersion() throws JMSException
+    {
+        return 8;
+    }
+
+    public String getProviderVersion() throws JMSException
+    {
+        return QpidProperties.getProductName() + " (Client: [" + getClientVersion() + "] ; Broker [" + getBrokerVersion() + "] ; Protocol: [ "
+               + getProtocolVersion() + "] )";
+    }
+
+    private String getProtocolVersion()
+    {
+        // TODO - Implement based on connection negotiated protocol
+        return "0.8";
+    }
+
+    public String getBrokerVersion()
+    {
+        // TODO - get broker version
+        return "<unkown>";
+    }
+
+    public String getClientVersion()
+    {
+        return QpidProperties.getBuildVerision();
+    }
+
 
-	private static QpidConnectionMetaData _instance = new QpidConnectionMetaData();
-	
-	private QpidConnectionMetaData(){		
-	}
-	
-	public static QpidConnectionMetaData instance(){
-		return _instance;
-	}	
-	
-	public int getJMSMajorVersion() throws JMSException {
-		return 1;
-	}
-
-	public int getJMSMinorVersion() throws JMSException {
-		return 1;
-	}
-
-	public String getJMSProviderName() throws JMSException {
-		return "Apache Qpid";
-	}
-
-	public String getJMSVersion() throws JMSException {
-		return "1.1";
-	}
-
-	public Enumeration getJMSXPropertyNames() throws JMSException {
-		return null;
-	}
-
-	public int getProviderMajorVersion() throws JMSException {
-		return 0;
-	}
-
-	public int getProviderMinorVersion() throws JMSException {
-		return 9;
-	}
-
-	public String getProviderVersion() throws JMSException {
-		return "Incubating-M1";
-	}
 }

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java Tue Jan 16 04:13:19 2007
@@ -23,6 +23,7 @@
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQChannelClosedException;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidSelectorException;
 import org.apache.qpid.client.AMQNoConsumersException;
 import org.apache.qpid.client.AMQNoRouteException;
 import org.apache.qpid.protocol.AMQConstant;
@@ -46,7 +47,7 @@
 
     public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
     {
-         _logger.debug("ChannelClose method received");
+        _logger.debug("ChannelClose method received");
         ChannelCloseBody method = (ChannelCloseBody) evt.getMethod();
 
         int errorCode = method.replyCode;
@@ -65,17 +66,21 @@
             {
                 throw new AMQNoConsumersException("Error: " + reason, null);
             }
+            else if (errorCode == AMQConstant.NO_ROUTE.getCode())
+            {
+                throw new AMQNoRouteException("Error: " + reason, null);
+            }
+            else if (errorCode == AMQConstant.INVALID_SELECTOR.getCode())
+            {
+                _logger.info("Broker responded with Invalid Selector.");
+
+                throw new AMQInvalidSelectorException(reason);
+            }
             else
             {
-                if (errorCode == AMQConstant.NO_ROUTE.getCode())
-                {
-                   throw new AMQNoRouteException("Error: " + reason, null);
-                }
-                else
-                {
-                    throw new AMQChannelClosedException(errorCode, "Error: " + reason);
-                }
+                throw new AMQChannelClosedException(errorCode, "Error: " + reason);
             }
+
         }
         evt.getProtocolSession().channelClosed(evt.getChannelId(), errorCode, reason);
     }

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java Tue Jan 16 04:13:19 2007
@@ -20,20 +20,14 @@
  */
 package org.apache.qpid.client.handler;
 
-import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.protocol.AMQMethodEvent;
-import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.client.state.AMQState;
 import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
-import org.apache.qpid.framing.ConnectionOpenOkBody;
 
 public class ConnectionOpenOkMethodHandler implements StateAwareMethodListener
 {
-
-    private static final Logger _logger = Logger.getLogger(ConnectionOpenOkMethodHandler.class);
-
     private static final ConnectionOpenOkMethodHandler _instance = new ConnectionOpenOkMethodHandler();
 
     public static ConnectionOpenOkMethodHandler getInstance()
@@ -47,8 +41,6 @@
 
     public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
     {
-        AMQProtocolSession session = evt.getProtocolSession();
-        ConnectionOpenOkBody method = (ConnectionOpenOkBody) evt.getMethod();        
         stateManager.changeState(AMQState.CONNECTION_OPEN);
     }
 

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java Tue Jan 16 04:13:19 2007
@@ -22,6 +22,8 @@
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.common.ClientProperties;
+import org.apache.qpid.common.QpidProperties;
 import org.apache.qpid.client.protocol.AMQMethodEvent;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.client.security.AMQCallbackHandler;
@@ -119,10 +121,11 @@
 
             stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
             FieldTable clientProperties = FieldTableFactory.newFieldTable();
-            clientProperties.put("instance", ps.getClientID());
-            clientProperties.put("product", "Qpid");
-            clientProperties.put("version", "1.0");
-            clientProperties.put("platform", getFullSystemInfo());
+            
+            clientProperties.put(ClientProperties.instance.toString(), ps.getClientID());
+            clientProperties.put(ClientProperties.product.toString(), QpidProperties.getProductName());
+            clientProperties.put(ClientProperties.version.toString(), QpidProperties.getReleaseVerision());
+            clientProperties.put(ClientProperties.platform.toString(), getFullSystemInfo());
             ps.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(), clientProperties, mechanism,
                                                                saslResponse, selectedLocale));
         }

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Tue Jan 16 04:13:19 2007
@@ -26,7 +26,8 @@
 import org.apache.qpid.url.BindingURL;
 import org.apache.qpid.url.AMQBindingURL;
 import org.apache.qpid.url.URLSyntaxException;
-import org.apache.qpid.client.*;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.BasicMessageConsumer;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.FieldTable;
 
@@ -46,7 +47,8 @@
 
     protected ByteBuffer _data;
     private boolean _readableProperties = false;
-    private boolean _readableMessage = false;
+    protected boolean _readableMessage = false;
+    protected boolean _changedData;
     private Destination _destination;
     private BasicMessageConsumer _consumer;
 
@@ -60,6 +62,7 @@
         }
         _readableProperties = false;
         _readableMessage = (data != null);
+        _changedData = (data == null);
     }
 
     protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data) throws AMQException
@@ -172,13 +175,12 @@
 
     public Destination getJMSDestination() throws JMSException
     {
-        // TODO: implement this once we have sorted out how to figure out the exchange class
-    	return _destination;
+        return _destination;
     }
 
     public void setJMSDestination(Destination destination) throws JMSException
     {
-    	_destination = destination;
+        _destination = destination;
     }
 
     public int getJMSDeliveryMode() throws JMSException
@@ -522,16 +524,16 @@
         return !_readableMessage;
     }
 
-    public void reset() 
+    public void reset()
     {
-        if (_readableMessage)
+        if (!_changedData)
         {
             _data.rewind();
         }
         else
         {
             _data.flip();
-            _readableMessage = true;
+            _changedData = false;
         }
     }
 

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java Tue Jan 16 04:13:19 2007
@@ -59,6 +59,12 @@
         super(messageNbr, contentHeader, data);
     }
 
+    public void reset()
+    {
+        super.reset();
+        _readableMessage = true;
+    }
+
     public String getMimeType()
     {
         return MIME_TYPE;
@@ -226,48 +232,56 @@
     public void writeBoolean(boolean b) throws JMSException
     {
         checkWritable();
+        _changedData = true;
         _data.put(b ? (byte) 1 : (byte) 0);
     }
 
     public void writeByte(byte b) throws JMSException
     {
         checkWritable();
+        _changedData = true;
         _data.put(b);
     }
 
     public void writeShort(short i) throws JMSException
     {
         checkWritable();
+        _changedData = true;
         _data.putShort(i);
     }
 
     public void writeChar(char c) throws JMSException
     {
         checkWritable();
+        _changedData = true;
         _data.putChar(c);
     }
 
     public void writeInt(int i) throws JMSException
     {
         checkWritable();
+        _changedData = true;
         _data.putInt(i);
     }
 
     public void writeLong(long l) throws JMSException
     {
         checkWritable();
+        _changedData = true;
         _data.putLong(l);
     }
 
     public void writeFloat(float v) throws JMSException
     {
         checkWritable();
+        _changedData = true;
         _data.putFloat(v);
     }
 
     public void writeDouble(double v) throws JMSException
     {
         checkWritable();
+        _changedData = true;
         _data.putDouble(v);
     }
 
@@ -281,7 +295,7 @@
             
             _data.putShort((short)encodedString.limit());
             _data.put(encodedString);
-
+            _changedData = true;
             //_data.putString(string, Charset.forName("UTF-8").newEncoder());
             // we must add the null terminator manually
             //_data.put((byte)0);
@@ -298,12 +312,14 @@
     {
         checkWritable();
         _data.put(bytes);
+        _changedData = true;
     }
 
     public void writeBytes(byte[] bytes, int offset, int length) throws JMSException
     {
         checkWritable();
         _data.put(bytes, offset, length);
+        _changedData = true;
     }
 
     public void writeObject(Object object) throws JMSException

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java Tue Jan 16 04:13:19 2007
@@ -112,7 +112,7 @@
         }
 
     }
-
+  
     public Serializable getObject() throws JMSException
     {
         ObjectInputStream in = null;
@@ -123,18 +123,18 @@
 
         try
         {
-        	_data.rewind();
+            _data.rewind();
             in = new ObjectInputStream(_data.asInputStream());
             return (Serializable) in.readObject();
         }
         catch (IOException e)
-        {           
-           e.printStackTrace();
-           throw new MessageFormatException("Could not deserialize message: " + e);
+        {
+            e.printStackTrace();
+            throw new MessageFormatException("Could not deserialize message: " + e);
         }
         catch (ClassNotFoundException e)
         {
-        	e.printStackTrace();
+            e.printStackTrace();
             throw new MessageFormatException("Could not deserialize message: " + e);
         }
         finally

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java Tue Jan 16 04:13:19 2007
@@ -86,6 +86,12 @@
         super(messageNbr, contentHeader, data);
     }
 
+    public void reset()
+    {
+        super.reset();
+        _readableMessage = true;
+    }
+
     public String getMimeType()
     {
         return MIME_TYPE;
@@ -103,6 +109,7 @@
     {
         checkWritable();
         _data.put(type);
+        _changedData = true;
     }
 
     public boolean readBoolean() throws JMSException
@@ -693,7 +700,7 @@
             {
                 _data.putString(string, Charset.forName("UTF-8").newEncoder());
                 // we must write the null terminator ourselves
-                _data.put((byte)0);
+                _data.put((byte) 0);
             }
             catch (CharacterCodingException e)
             {
@@ -706,7 +713,7 @@
 
     public void writeBytes(byte[] bytes) throws JMSException
     {
-        writeBytes(bytes, 0, bytes == null?0:bytes.length);
+        writeBytes(bytes, 0, bytes == null ? 0 : bytes.length);
     }
 
     public void writeBytes(byte[] bytes, int offset, int length) throws JMSException

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java Tue Jan 16 04:13:19 2007
@@ -117,6 +117,7 @@
                 {
                     _data.put(text.getBytes(getJmsContentHeaderProperties().getEncoding()));
                 }
+                _changedData=true;
             }
             _decodedValue = text;
         }

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Tue Jan 16 04:13:19 2007
@@ -406,4 +406,12 @@
             HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
         }
     }
+
+    public void confirmConsumerCancelled(int channelId, String consumerTag)
+    {
+        final Integer chId = channelId;
+        final AMQSession session = (AMQSession) _channelId2SessionMap.get(chId);
+
+        session.confirmConsumerCancelled(consumerTag);
+    }
 }

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java Tue Jan 16 04:13:19 2007
@@ -110,7 +110,7 @@
             }
             else
             {
-                throw new AMQException("Woken up due to exception", _error); // FIXME: This will wrap FailoverException and prevent it being caught.
+                throw new AMQException("Woken up due to " + _error.getClass(), _error); // FIXME: This will wrap FailoverException and prevent it being caught.
             }
         }
 

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java Tue Jan 16 04:13:19 2007
@@ -103,6 +103,7 @@
         frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
         frame2handlerMap.put(BasicDeliverBody.class, BasicDeliverMethodHandler.getInstance());
         frame2handlerMap.put(BasicReturnBody.class, BasicReturnMethodHandler.getInstance());
+        frame2handlerMap.put(BasicCancelOkBody.class, BasicCancelOkMethodHandler.getInstance());
         frame2handlerMap.put(ChannelFlowOkBody.class, ChannelFlowOkMethodHandler.getInstance());
         frame2handlerMap.put(QueueDeleteOkBody.class, QueueDeleteOkMethodHandler.getInstance());
         frame2handlerMap.put(ExchangeBoundOkBody.class, ExchangeBoundOkMethodHandler.getInstance());

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java Tue Jan 16 04:13:19 2007
@@ -59,7 +59,7 @@
         // once more testing of the performance of the simple allocator has been done
         if (!Boolean.getBoolean("amqj.enablePooledAllocator"))
         {
-            _logger.warn("Using SimpleByteBufferAllocator");
+            _logger.info("Using SimpleByteBufferAllocator");
             ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
         }
 

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java Tue Jan 16 04:13:19 2007
@@ -127,7 +127,7 @@
         _logger.info("Starting connection");
         con.start();
         TextMessage tm = (TextMessage) consumer.receive();
-        TextMessage tm2 = (TextMessage) consumer.receive();
+        consumer.receive();
         tm.acknowledge();
         _logger.info("Received 2 messages, acknowledge() first message, should acknowledge both");
 

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java Tue Jan 16 04:13:19 2007
@@ -31,7 +31,7 @@
 public class MultipleConnectionTest extends TestCase
 {
     public static final String _defaultBroker = "vm://:1";
-    public static String _connectionString = _defaultBroker;
+    public String _connectionString = _defaultBroker;
 
     private static class Receiver
     {
@@ -175,9 +175,6 @@
     public static void main(String[] argv) throws Exception
     {
         String broker = argv.length > 0 ? argv[0] : _defaultBroker;
-
-        int connections = 7;
-        int sessions = 2;
 
         MultipleConnectionTest test = new MultipleConnectionTest();
         test._connectionString = broker;

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java Tue Jan 16 04:13:19 2007
@@ -116,7 +116,9 @@
             m.setIntProperty("Int", (int) Integer.MAX_VALUE);
 
             m.setJMSCorrelationID("Correlation");
-            m.setJMSPriority(100);
+            //fixme the m.setJMSMessage has no effect
+            producer.setPriority(8);
+            m.setJMSPriority(3);
 
             //  Queue
             Queue q;
@@ -182,10 +184,8 @@
                                 (int) Integer.MAX_VALUE, m.getIntProperty("Int"));
             Assert.assertEquals("Check CorrelationID properties are correctly transported",
                                 "Correlation", m.getJMSCorrelationID());
-
-            _logger.warn("getJMSPriority not being verified.");
-//            Assert.assertEquals("Check Priority properties are correctly transported",
-//                                100, m.getJMSPriority());
+            Assert.assertEquals("Check Priority properties are correctly transported",
+                                8, m.getJMSPriority());
 
             // Queue
             Assert.assertEquals("Check ReplyTo properties are correctly transported",

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java Tue Jan 16 04:13:19 2007
@@ -121,7 +121,7 @@
     {
         if (_connection != null)
         {
-            System.out.println(">>>>>>>>>>>>>>.. closing");
+            _log.info(">>>>>>>>>>>>>>.. closing");
             _connection.close();
         }
     }
@@ -137,7 +137,7 @@
         {
             public void onException(JMSException jmsException)
             {
-                _log.error("onException - ", jmsException);
+                _log.warn("onException - "+jmsException.getMessage());
             }
         });
 

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java Tue Jan 16 04:13:19 2007
@@ -25,7 +25,6 @@
 import javax.jms.Connection;
 import javax.jms.Session;
 import javax.jms.MessageConsumer;
-import javax.jms.Message;
 
 /**
  * @author Apache Software Foundation
@@ -72,7 +71,7 @@
         };
         long startTime = System.currentTimeMillis();
         new Thread(r).start();
-        Message m = consumer.receive(10000);
+        consumer.receive(10000);
         assertTrue(System.currentTimeMillis() - startTime < 10000);
     }
 

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java Tue Jan 16 04:13:19 2007
@@ -54,8 +54,7 @@
     {
         try
         {
-            Connection connection = new AMQConnection(_broker, "guest", "guest",
-                                                      "fred", "/test");
+            new AMQConnection(_broker, "guest", "guest", "fred", "/test");
         }
         catch (Exception e)
         {

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/MapMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/MapMessageTest.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/MapMessageTest.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/MapMessageTest.java Tue Jan 16 04:13:19 2007
@@ -108,7 +108,7 @@
             JMSMapMessage mm = TestMessageHelper.newJMSMapMessage();
 
             mm.setString("value", null);
-            char c = mm.getChar("value");
+            mm.getChar("value");
             fail("Expected NullPointerException");
 
         }

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java Tue Jan 16 04:13:19 2007
@@ -89,10 +89,10 @@
 
         StreamMessage msg2 = (StreamMessage) consumer.receive();
 
-        byte b1 = msg2.readByte();
+        msg2.readByte();
         try
         {
-            byte b2 = msg2.readByte();
+            msg2.readByte();
         }
         catch (Exception e)
         {

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java Tue Jan 16 04:13:19 2007
@@ -260,7 +260,7 @@
         TopicSession session2 = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
         try
         {
-            MessageConsumer consumer2 = session2.createConsumer(topic);
+            session2.createConsumer(topic);
             fail("Expected a JMSException when subscribing to a temporary topic created on adifferent session");
         }
         catch (JMSException je)

Modified: incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java Tue Jan 16 04:13:19 2007
@@ -23,6 +23,8 @@
 import org.apache.log4j.Logger;
 import org.apache.qpid.server.cluster.util.LogMessage;
 
+import java.util.List;
+
 class ClusteredSubscriptionManager extends SubscriptionSet
 {
     private static final Logger _logger = Logger.getLogger(ClusteredSubscriptionManager.class);
@@ -80,6 +82,11 @@
         public int getWeight()
         {
             return ClusteredSubscriptionManager.this.getWeight();
+        }
+
+        public List<Subscription> getSubscriptions()
+        {
+            return ClusteredSubscriptionManager.super.getSubscriptions();
         }
 
         public boolean hasActiveSubscribers()