You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2006/12/26 22:04:39 UTC

svn commit: r490372 [2/3] - in /incubator/qpid/branches/new_persistence/java: ./ broker/ broker/etc/ broker/src/main/grammar/ broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/filter/ broker/src/main/java/org/apac...

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java Tue Dec 26 13:04:28 2006
@@ -22,6 +22,8 @@
 
 import org.apache.qpid.AMQException;
 
+import java.util.Queue;
+
 public interface Subscription
 {
     void send(AMQMessage msg, AMQQueue queue) throws AMQException;
@@ -29,4 +31,18 @@
     boolean isSuspended();
 
     void queueDeleted(AMQQueue queue) throws AMQException;
+
+    boolean hasFilters();
+
+    boolean hasInterest(AMQMessage msg);
+
+    Queue<AMQMessage> getPreDeliveryQueue();
+
+    void enqueueForPreDelivery(AMQMessage msg);
+
+    boolean isAutoClose();
+
+    void close();
+
+    boolean isBrowser();   
 }

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java Tue Dec 26 13:04:28 2006
@@ -22,6 +22,7 @@
 
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.FieldTable;
 
 /**
  * Allows the customisation of the creation of a subscription. This is typically done within an AMQQueue. This
@@ -32,9 +33,10 @@
  */
 public interface SubscriptionFactory
 {
-    Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks)
-        throws AMQException;
+    Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks,
+                                    FieldTable filters, boolean noLocal) throws AMQException;
 
-    Subscription createSubscription(int channel, AMQProtocolSession protocolSession,String consumerTag)
-        throws AMQException;
+
+    Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag)
+            throws AMQException;
 }

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Tue Dec 26 13:04:28 2006
@@ -22,8 +22,21 @@
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.common.ClientProperties;
+import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.BasicDeliverBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.BasicCancelOkBody;
 import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.filter.FilterManagerFactory;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.mina.common.ByteBuffer;
+
+import java.util.Queue;
 
 /**
  * Encapsulation of a supscription to a queue.
@@ -44,23 +57,30 @@
 
     private final Object sessionKey;
 
+    private Queue<AMQMessage> _messages;
+
+    private final boolean _noLocal;
+
     /**
      * True if messages need to be acknowledged
      */
     private final boolean _acks;
+    private FilterManager _filters;
+    private final boolean _isBrowser;
+    private final Boolean _autoClose;
+    private boolean _closed = false;
 
     public static class Factory implements SubscriptionFactory
     {
-        public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks)
-                throws AMQException
+        public Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, FieldTable filters, boolean noLocal) throws AMQException
         {
-            return new SubscriptionImpl(channel, protocolSession, consumerTag, acks);
+            return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters, noLocal);
         }
 
         public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag)
                 throws AMQException
         {
-            return new SubscriptionImpl(channel, protocolSession, consumerTag);
+            return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null, false);
         }
     }
 
@@ -68,6 +88,13 @@
                             String consumerTag, boolean acks)
             throws AMQException
     {
+        this(channelId, protocolSession, consumerTag, acks, null, false);
+    }
+
+    public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession,
+                            String consumerTag, boolean acks, FieldTable filters, boolean noLocal)
+            throws AMQException
+    {
         AMQChannel channel = protocolSession.getChannel(channelId);
         if (channel == null)
         {
@@ -79,8 +106,61 @@
         this.consumerTag = consumerTag;
         sessionKey = protocolSession.getKey();
         _acks = acks;
+        _noLocal = noLocal;
+
+        _filters = FilterManagerFactory.createManager(filters);
+
+
+        if (_filters != null)
+        {
+            Object isBrowser = filters.get(AMQPFilterTypes.NO_CONSUME.getValue());
+            if (isBrowser != null)
+            {
+                _isBrowser = (Boolean) isBrowser;
+            }
+            else
+            {
+                _isBrowser = false;
+            }
+        }
+        else
+        {
+            _isBrowser = false;
+        }
+
+
+        if (_filters != null)
+        {
+            Object autoClose = filters.get(AMQPFilterTypes.AUTO_CLOSE.getValue());
+            if (autoClose != null)
+            {
+                _autoClose = (Boolean) autoClose;
+            }
+            else
+            {
+                _autoClose = false;
+            }
+        }
+        else
+        {
+            _autoClose = false;
+        }
+
+
+        if (_filters != null)
+        {
+            _messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>();
+
+
+        }
+        else
+        {
+            // Reference the DeliveryManager
+            _messages = null;
+        }
     }
 
+
     public SubscriptionImpl(int channel, AMQProtocolSession protocolSession,
                             String consumerTag)
             throws AMQException
@@ -125,6 +205,44 @@
     {
         if (msg != null)
         {
+            if (_isBrowser)
+            {
+                sendToBrowser(msg, queue);
+            }
+            else
+            {
+                sendToConsumer(msg, queue);
+            }
+        }
+        else
+        {
+            _logger.error("Attempt to send Null message", new NullPointerException());
+        }
+    }
+
+    private void sendToBrowser(AMQMessage msg, AMQQueue queue) throws AMQException
+    {
+        // We don't decrement the reference here as we don't want to consume the message
+        // but we do want to send it to the client.
+
+        synchronized(channel)
+        {
+            long deliveryTag = channel.getNextDeliveryTag();
+
+            // We don't need to add the message to the unacknowledgedMap as we don't need to know if the client
+            // received the message. If it is lost in transit that is not important.
+            if (_acks)
+            {
+                channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue);
+            }
+            msg.writeDeliver(protocolSession, channel.getChannelId(), deliveryTag, consumerTag);
+        }
+    }
+
+    private void sendToConsumer(AMQMessage msg, AMQQueue queue) throws AMQException
+    {
+        try
+        {
             // if we do not need to wait for client acknowledgements
             // we can decrement the reference count immediately.
 
@@ -150,9 +268,9 @@
                 msg.writeDeliver(protocolSession, channel.getChannelId(), deliveryTag, consumerTag);
             }
         }
-        else
+        finally
         {
-            _logger.error("Attempt to send Null message", new NullPointerException());
+            msg.setDeliveredToConsumer();
         }
     }
 
@@ -169,5 +287,111 @@
     public void queueDeleted(AMQQueue queue) throws AMQException
     {
         channel.queueDeleted(queue);
+    }
+
+    public boolean hasFilters()
+    {
+        return _filters != null;
+    }
+
+    public boolean hasInterest(AMQMessage msg)
+    {
+        if (_noLocal)
+        {
+            // We don't want local messages so check to see if message is one we sent
+            if (protocolSession.getClientProperties().get(ClientProperties.instance.toString()).equals(
+                    msg.getPublisher().getClientProperties().get(ClientProperties.instance.toString())))
+            {
+                if (_logger.isTraceEnabled())
+                {
+                    _logger.trace("(" + System.identityHashCode(this) + ") has no interest as it is a local message(" +
+                                  System.identityHashCode(msg) + ")");
+                }
+                return false;
+            }
+            else // if not then filter the message.
+            {
+                if (_logger.isTraceEnabled())
+                {
+                    _logger.trace("(" + System.identityHashCode(this) + ") local message(" + System.identityHashCode(msg) +
+                                  ") but not ours so filtering");
+                }
+                return checkFilters(msg);
+            }
+        }
+        else
+        {
+            if (_logger.isTraceEnabled())
+            {
+                _logger.trace("(" + System.identityHashCode(this) + ") checking filters for message (" + System.identityHashCode(msg));
+            }
+            return checkFilters(msg);
+        }
+    }
+
+    private boolean checkFilters(AMQMessage msg)
+    {
+        if (_filters != null)
+        {
+            if (_logger.isTraceEnabled())
+            {
+                _logger.trace("(" + System.identityHashCode(this) + ") has filters.");
+            }
+            return _filters.allAllow(msg);
+        }
+        else
+        {
+            if (_logger.isTraceEnabled())
+            {
+                _logger.trace("(" + System.identityHashCode(this) + ") has no filters");
+            }
+
+            return true;
+        }
+    }
+
+    public Queue<AMQMessage> getPreDeliveryQueue()
+    {
+        return _messages;
+    }
+
+    public void enqueueForPreDelivery(AMQMessage msg)
+    {
+        if (_messages != null)
+        {
+            _messages.offer(msg);
+        }
+    }
+
+    public boolean isAutoClose()
+    {
+        return _autoClose;
+    }
+
+    public void close()
+    {
+        if (!_closed)
+        {
+            _logger.info("Closing autoclose subscription:" + this);
+            protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(), consumerTag));
+            _closed = true;
+        }
+    }
+
+    public boolean isBrowser()
+    {
+        return _isBrowser;
+    }
+
+
+    private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange)
+    {
+        AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channel.getChannelId(), consumerTag,
+                                                                deliveryTag, false, exchange,
+                                                                routingKey);
+        ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem?
+        deliverFrame.writePayload(buf);
+        buf.flip();
+        return buf;
     }
 }

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java Tue Dec 26 13:04:28 2006
@@ -20,12 +20,15 @@
  */
 package org.apache.qpid.server.queue;
 
+import java.util.List;
+
 /**
  * Abstraction of actor that will determine the subscriber to whom
  * a message will be sent.
  */
 public interface SubscriptionManager
 {
+    public List<Subscription> getSubscriptions();
     public boolean hasActiveSubscribers();
     public Subscription nextSubscriber(AMQMessage msg);
 }

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java Tue Dec 26 13:04:28 2006
@@ -60,6 +60,7 @@
 
     /**
      * Remove the subscription, returning it if it was found
+     *
      * @param subscription
      * @return null if no match was found
      */
@@ -92,7 +93,7 @@
 
     /**
      * Return the next unsuspended subscription or null if not found.
-     *
+     * <p/>
      * Performance note:
      * This method can scan all items twice when looking for a subscription that is not
      * suspended. The worst case occcurs when all subscriptions are suspended. However, it is does this
@@ -107,31 +108,51 @@
             return null;
         }
 
-        try {
-            final Subscription result = nextSubscriber();
-            if (result == null) {
+        try
+        {
+            final Subscription result = nextSubscriberImpl(msg);
+            if (result == null)
+            {
                 _currentSubscriber = 0;
-                return nextSubscriber();
-            } else {
+                return nextSubscriberImpl(msg);
+            }
+            else
+            {
                 return result;
             }
-        } catch (IndexOutOfBoundsException e) {
+        }
+        catch (IndexOutOfBoundsException e)
+        {
             _currentSubscriber = 0;
-            return nextSubscriber();
+            return nextSubscriber(msg);
         }
     }
 
-    private Subscription nextSubscriber()
+    private Subscription nextSubscriberImpl(AMQMessage msg)
     {
         final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber);
-        while (iterator.hasNext()) {
+        while (iterator.hasNext())
+        {
             Subscription subscription = iterator.next();
             ++_currentSubscriber;
             subscriberScanned();
-            if (!subscription.isSuspended()) {
-                return subscription;
+
+            if (!subscription.isSuspended())
+            {
+                if (subscription.hasInterest(msg))
+                {
+                    // if the queue is not empty then this client is ready to receive a message.
+                    //FIXME the queue could be full of sent messages.
+                    // Either need to clean all PDQs after sending a message
+                    // OR have a clean up thread that runs the PDQs expunging the messages.
+                    if (!subscription.hasFilters() || subscription.getPreDeliveryQueue().isEmpty())
+                    {
+                        return subscription;
+                    }
+                }
             }
         }
+
         return null;
     }
 
@@ -147,11 +168,19 @@
         return _subscriptions.isEmpty();
     }
 
+    public List<Subscription> getSubscriptions()
+    {
+        return _subscriptions;
+    }
+
     public boolean hasActiveSubscribers()
     {
         for (Subscription s : _subscriptions)
         {
-            if (!s.isSuspended()) return true;
+            if (!s.isSuspended())
+            {
+                return true;
+            }
         }
         return false;
     }
@@ -161,7 +190,10 @@
         int count = 0;
         for (Subscription s : _subscriptions)
         {
-            if (!s.isSuspended()) count++;
+            if (!s.isSuspended())
+            {
+                count++;
+            }
         }
         return count;
     }
@@ -169,6 +201,7 @@
     /**
      * Notification that a queue has been deleted. This is called so that the subscription can inform the
      * channel, which in turn can update its list of unacknowledged messages.
+     *
      * @param queue
      */
     public void queueDeleted(AMQQueue queue) throws AMQException
@@ -179,7 +212,8 @@
         }
     }
 
-    int size() {
+    int size()
+    {
         return _subscriptions.size();
     }
 }

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java Tue Dec 26 13:04:28 2006
@@ -35,7 +35,7 @@
  */
 class SynchronizedDeliveryManager implements DeliveryManager
 {
-    private static final Logger _log = Logger.getLogger(ConcurrentDeliveryManager.class);
+    private static final Logger _log = Logger.getLogger(SynchronizedDeliveryManager.class);
 
     /**
      * Holds any queued messages
@@ -124,6 +124,11 @@
         return new ArrayList<AMQMessage>(_messages);
     }
 
+    public void populatePreDeliveryQueue(Subscription subscription)
+    {
+        //no-op . This DM has no PreDeliveryQueues
+    }
+
     public synchronized void removeAMessageFromTop() throws AMQException
     {
         AMQMessage msg = poll();
@@ -245,7 +250,6 @@
                 else
                 {
                     s.send(msg, _queue);
-                    msg.setDeliveredToConsumer();
                 }
             }
         }

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java Tue Dec 26 13:04:28 2006
@@ -31,6 +31,7 @@
 
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Set;
 
 /**
  * @author Apache Software Foundation
@@ -49,6 +50,8 @@
      */
     private final List<RequiredDeliveryException> _returnMessages;
 
+    private Set<Long> _browsedAcks;
+
     private final MessageStore _messageStore;
 
     /**
@@ -57,11 +60,12 @@
     private boolean _inTran;
 
     public NonTransactionalContext(MessageStore messageStore, AMQChannel channel,
-                                   List<RequiredDeliveryException> returnMessages)
+                                   List<RequiredDeliveryException> returnMessages, Set<Long> browsedAcks)
     {
         _channel = channel;
         _returnMessages = returnMessages;
         _messageStore = messageStore;
+        _browsedAcks = browsedAcks;
     }
 
     public void beginTranIfNecessary() throws AMQException
@@ -111,12 +115,19 @@
                 //Spec 2.1.6.11 ... If the multiple field is 1, and the delivery tag is zero,
                 // tells the server to acknowledge all outstanding mesages.
                 _log.info("Multiple ack on delivery tag 0. ACKing all messages. Current count:" +
-                        unacknowledgedMessageMap.size());
+                          unacknowledgedMessageMap.size());
                 unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
                 {
                     public boolean callback(UnacknowledgedMessage message) throws AMQException
                     {
-                        message.discard();
+                        if (!_browsedAcks.contains(deliveryTag))
+                        {
+                            message.discard();
+                        }
+                        else
+                        {
+                            _browsedAcks.remove(deliveryTag);
+                        }
                         return false;
                     }
 
@@ -137,7 +148,14 @@
                 unacknowledgedMessageMap.drainTo(acked, deliveryTag);
                 for (UnacknowledgedMessage msg : acked)
                 {
-                    msg.discard();
+                    if (!_browsedAcks.contains(deliveryTag))
+                    {
+                        msg.discard();
+                    }
+                    else
+                    {
+                        _browsedAcks.remove(deliveryTag);
+                    }
                 }
             }
         }

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/util/CircularBuffer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/util/CircularBuffer.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/util/CircularBuffer.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/util/CircularBuffer.java Tue Dec 26 13:04:28 2006
@@ -20,10 +20,15 @@
  */
 package org.apache.qpid.server.util;
 
+import org.apache.log4j.Logger;
+
 import java.util.Iterator;
 
 public class CircularBuffer implements Iterable
 {
+
+    private static final Logger _logger = Logger.getLogger(CircularBuffer.class);
+
     private final Object[] _log;
     private int _size;
     private int _index;
@@ -102,7 +107,7 @@
     {
         for(Object o : this)
         {
-         System.out.println(o);   
+         _logger.info(o);
         }
     }
 
@@ -120,7 +125,7 @@
         for(String s : items)
         {
             buffer.add(s);
-            System.out.println(buffer);
+            _logger.info(buffer);
         }
     }
 }

Modified: incubator/qpid/branches/new_persistence/java/client/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/pom.xml?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/pom.xml (original)
+++ incubator/qpid/branches/new_persistence/java/client/pom.xml Tue Dec 26 13:04:28 2006
@@ -35,7 +35,6 @@
 
     <properties>
         <topDirectoryLocation>..</topDirectoryLocation>
-        <amqj.logging.level>warn</amqj.logging.level>
     </properties>
 
     <dependencies>
@@ -96,6 +95,11 @@
 
     <build>
         <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-antrun-plugin</artifactId>
+            </plugin>
+
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-surefire-plugin</artifactId>

Modified: incubator/qpid/branches/new_persistence/java/client/src/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/log4j.properties?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/log4j.properties (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/log4j.properties Tue Dec 26 13:04:28 2006
@@ -1,10 +1,10 @@
-log4j.rootLogger=${root.logging.level}
+log4j.rootLogger=${amqj.logging.level}
 
 
 log4j.logger.org.apache.qpid=${amqj.logging.level}, console
 log4j.additivity.org.apache.qpid=false
 
 log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.Threshold=info
+log4j.appender.console.Threshold=debug
 log4j.appender.console.layout=org.apache.log4j.PatternLayout
 log4j.appender.console.layout.ConversionPattern=%t %d %p [%c{4}] %m%n

Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/log4j.properties?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/log4j.properties (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/log4j.properties Tue Dec 26 13:04:28 2006
@@ -16,10 +16,10 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-log4j.rootLogger=WARN
+log4j.rootLogger=${amqj.logging.level}
 
 
-log4j.logger.org.apache.qpid=WARN, console
+log4j.logger.org.apache.qpid=${amqj.logging.level}, console
 log4j.additivity.org.apache.qpid=false
 
 log4j.appender.console=org.apache.log4j.ConsoleAppender

Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Dec 26 13:04:28 2006
@@ -23,13 +23,15 @@
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQUndeliveredException;
+import org.apache.qpid.AMQInvalidSelectorException;
+import org.apache.qpid.common.AMQPFilterTypes;
 import org.apache.qpid.client.failover.FailoverSupport;
 import org.apache.qpid.client.message.AbstractJMSMessage;
 import org.apache.qpid.client.message.JMSStreamMessage;
 import org.apache.qpid.client.message.MessageFactoryRegistry;
 import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.qpid.client.protocol.AMQMethodEvent;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.protocol.AMQMethodEvent;
 import org.apache.qpid.client.util.FlowControllingBlockingQueue;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.*;
@@ -69,15 +71,15 @@
     private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK;
 
     /**
-     *  Used to reference durable subscribers so they requests for unsubscribe can be handled
-     *  correctly.  Note this only keeps a record of subscriptions which have been created
-     *  in the current instance.  It does not remember subscriptions between executions of the
-     *  client
+     * Used to reference durable subscribers so they requests for unsubscribe can be handled
+     * correctly.  Note this only keeps a record of subscriptions which have been created
+     * in the current instance.  It does not remember subscriptions between executions of the
+     * client
      */
     private final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions =
             new ConcurrentHashMap<String, TopicSubscriberAdaptor>();
     private final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap =
-                new ConcurrentHashMap<BasicMessageConsumer, String>();
+            new ConcurrentHashMap<BasicMessageConsumer, String>();
 
     /**
      * Used in the consume method. We generate the consume tag on the client so that we can use the nowait
@@ -143,6 +145,7 @@
     private boolean _inRecovery;
 
 
+
     /**
      * Responsible for decoding a message fragment and passing it to the appropriate message consumer.
      */
@@ -176,7 +179,7 @@
         {
             if (message.deliverBody != null)
             {
-                final BasicMessageConsumer consumer = _consumers.get(message.deliverBody.consumerTag);
+                final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.deliverBody.consumerTag);
 
                 if (consumer == null)
                 {
@@ -210,17 +213,15 @@
                     {
                         _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage));
                     }
+                    else if (errorCode == AMQConstant.NO_ROUTE.getCode())
+                    {
+                        _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage));
+                    }
                     else
                     {
-                        if (errorCode == AMQConstant.NO_ROUTE.getCode())
-                        {
-                            _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage));
-                        }
-                        else
-                        {
-                            _connection.exceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage));
-                        }
+                        _connection.exceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage));
                     }
+
                 }
                 catch (Exception e)
                 {
@@ -318,7 +319,7 @@
 
     public BytesMessage createBytesMessage() throws JMSException
     {
-        synchronized (_connection.getFailoverMutex())
+        synchronized(_connection.getFailoverMutex())
         {
             checkNotClosed();
             try
@@ -334,7 +335,7 @@
 
     public MapMessage createMapMessage() throws JMSException
     {
-        synchronized (_connection.getFailoverMutex())
+        synchronized(_connection.getFailoverMutex())
         {
             checkNotClosed();
             try
@@ -350,7 +351,7 @@
 
     public javax.jms.Message createMessage() throws JMSException
     {
-        synchronized (_connection.getFailoverMutex())
+        synchronized(_connection.getFailoverMutex())
         {
             checkNotClosed();
             try
@@ -366,7 +367,7 @@
 
     public ObjectMessage createObjectMessage() throws JMSException
     {
-        synchronized (_connection.getFailoverMutex())
+        synchronized(_connection.getFailoverMutex())
         {
             checkNotClosed();
             try
@@ -382,7 +383,7 @@
 
     public ObjectMessage createObjectMessage(Serializable object) throws JMSException
     {
-        synchronized (_connection.getFailoverMutex())
+        synchronized(_connection.getFailoverMutex())
         {
             checkNotClosed();
             try
@@ -400,7 +401,7 @@
 
     public StreamMessage createStreamMessage() throws JMSException
     {
-        synchronized (_connection.getFailoverMutex())
+        synchronized(_connection.getFailoverMutex())
         {
             checkNotClosed();
 
@@ -417,7 +418,7 @@
 
     public TextMessage createTextMessage() throws JMSException
     {
-        synchronized (_connection.getFailoverMutex())
+        synchronized(_connection.getFailoverMutex())
         {
             checkNotClosed();
 
@@ -434,7 +435,7 @@
 
     public TextMessage createTextMessage(String text) throws JMSException
     {
-        synchronized (_connection.getFailoverMutex())
+        synchronized(_connection.getFailoverMutex())
         {
             checkNotClosed();
             try
@@ -504,7 +505,7 @@
     {
         // We must close down all producers and consumers in an orderly fashion. This is the only method
         // that can be called from a different thread of control from the one controlling the session
-        synchronized (_connection.getFailoverMutex())
+        synchronized(_connection.getFailoverMutex())
         {
             //Ensure we only try and close an open session.
             if (!_closed.getAndSet(true))
@@ -569,7 +570,7 @@
      */
     public void closed(Throwable e)
     {
-        synchronized (_connection.getFailoverMutex())
+        synchronized(_connection.getFailoverMutex())
         {
             // An AMQException has an error code and message already and will be passed in when closure occurs as a
             // result of a channel close request
@@ -721,11 +722,11 @@
 
     public void acknowledge() throws JMSException
     {
-        if(isClosed())
+        if (isClosed())
         {
             throw new IllegalStateException("Session is already closed");
         }
-        for(BasicMessageConsumer consumer : _consumers.values())
+        for (BasicMessageConsumer consumer : _consumers.values())
         {
             consumer.acknowledge();
         }
@@ -734,7 +735,6 @@
     }
 
 
-
     public MessageListener getMessageListener() throws JMSException
     {
         checkNotClosed();
@@ -843,7 +843,9 @@
                                   false,
                                   false,
                                   null,
-                                  null);
+                                  null,
+                                  false,
+                                  false);
     }
 
     public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
@@ -855,7 +857,9 @@
                                   false,
                                   false,
                                   messageSelector,
-                                  null);
+                                  null,
+                                  false,
+                                  false);
     }
 
     public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
@@ -868,7 +872,26 @@
                                   noLocal,
                                   false,
                                   messageSelector,
-                                  null);
+                                  null,
+                                  false,
+                                  false);
+    }
+
+    public MessageConsumer createBrowserConsumer(Destination destination,
+                                         String messageSelector,
+                                         boolean noLocal)
+            throws JMSException
+    {
+        checkValidDestination(destination);
+        return createConsumerImpl(destination,
+                                  _defaultPrefetchHighMark,
+                                  _defaultPrefetchLowMark,
+                                  noLocal,
+                                  false,
+                                  messageSelector,
+                                  null,
+                                  true,
+                                  true);
     }
 
     public MessageConsumer createConsumer(Destination destination,
@@ -878,7 +901,7 @@
                                           String selector) throws JMSException
     {
         checkValidDestination(destination);
-        return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, null);
+        return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, null, false, false);
     }
 
 
@@ -890,7 +913,7 @@
                                           String selector) throws JMSException
     {
         checkValidDestination(destination);
-        return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null);
+        return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, false, false);
     }
 
     public MessageConsumer createConsumer(Destination destination,
@@ -902,7 +925,7 @@
     {
         checkValidDestination(destination);
         return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive,
-                                  selector, rawSelector);
+                                  selector, rawSelector, false, false);
     }
 
     public MessageConsumer createConsumer(Destination destination,
@@ -915,7 +938,7 @@
     {
         checkValidDestination(destination);
         return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive,
-                                  selector, rawSelector);
+                                  selector, rawSelector, false, false);
     }
 
     protected MessageConsumer createConsumerImpl(final Destination destination,
@@ -924,7 +947,9 @@
                                                  final boolean noLocal,
                                                  final boolean exclusive,
                                                  final String selector,
-                                                 final FieldTable rawSelector) throws JMSException
+                                                 final FieldTable rawSelector,
+                                                 final boolean noConsume,
+                                                 final boolean autoClose) throws JMSException
     {
         checkTemporaryDestination(destination);
 
@@ -948,12 +973,18 @@
                 BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, _connection, amqd, selector, noLocal,
                                                                          _messageFactoryRegistry, AMQSession.this,
                                                                          protocolHandler, ft, prefetchHigh, prefetchLow, exclusive,
-                                                                         _acknowledgeMode);
+                                                                         _acknowledgeMode, noConsume, autoClose);
 
                 try
                 {
                     registerConsumer(consumer, false);
                 }
+                catch (AMQInvalidSelectorException ise)
+                {
+                    JMSException ex = new InvalidSelectorException(ise.getMessage());
+                    ex.setLinkedException(ise);
+                    throw ex;
+                }
                 catch (AMQException e)
                 {
                     JMSException ex = new JMSException("Error registering consumer: " + e);
@@ -963,7 +994,7 @@
 
                 synchronized(destination)
                 {
-                    _destinationConsumerCount.putIfAbsent(destination,new AtomicInteger());
+                    _destinationConsumerCount.putIfAbsent(destination, new AtomicInteger());
                     _destinationConsumerCount.get(destination).incrementAndGet();
                 }
 
@@ -975,16 +1006,16 @@
     private void checkTemporaryDestination(Destination destination)
             throws JMSException
     {
-        if((destination instanceof TemporaryDestination))
+        if ((destination instanceof TemporaryDestination))
         {
             _logger.debug("destination is temporary");
             final TemporaryDestination tempDest = (TemporaryDestination) destination;
-            if(tempDest.getSession() != this)
+            if (tempDest.getSession() != this)
             {
                 _logger.debug("destination is on different session");
                 throw new JMSException("Cannot consume from a temporary destination created onanother session");
             }
-            if(tempDest.isDeleted())
+            if (tempDest.isDeleted())
             {
                 _logger.debug("destination is deleted");
                 throw new JMSException("Cannot consume from a deleted destination");
@@ -1065,12 +1096,26 @@
      * @return the consumer tag generated by the broker
      */
     private void consumeFromQueue(BasicMessageConsumer consumer, String queueName, AMQProtocolHandler protocolHandler,
-                                  boolean nowait) throws AMQException
+                                  boolean nowait, String messageSelector) throws AMQException
     {
         //fixme prefetch values are not used here. Do we need to have them as parametsrs?
         //need to generate a consumer tag on the client so we can exploit the nowait flag
         String tag = Integer.toString(_nextTag++);
 
+        FieldTable arguments = FieldTableFactory.newFieldTable();
+        if (messageSelector != null && !messageSelector.equals(""))
+        {
+            arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector);
+        }
+        if(consumer.isAutoClose())
+        {
+            arguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE);
+        }
+        if(consumer.isNoConsume())
+        {
+            arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
+        }
+
         consumer.setConsumerTag(tag);
         // we must register the consumer in the map before we actually start listening
         _consumers.put(tag, consumer);
@@ -1080,7 +1125,7 @@
             AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, 0,
                                                                   queueName, tag, consumer.isNoLocal(),
                                                                   consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,
-                                                                  consumer.isExclusive(), nowait, null);
+                                                                  consumer.isExclusive(), nowait, arguments);
             if (nowait)
             {
                 protocolHandler.writeFrame(jmsConsume);
@@ -1220,7 +1265,7 @@
     {
         checkNotClosed();
         checkValidTopic(topic);
-        AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic)topic, name, _connection);
+        AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection);
         TopicSubscriberAdaptor subscriber = _subscriptions.get(name);
         if (subscriber != null)
         {
@@ -1247,8 +1292,8 @@
 
         subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
 
-        _subscriptions.put(name,subscriber);
-        _reverseSubscriptionMap.put(subscriber.getMessageConsumer(),name);
+        _subscriptions.put(name, subscriber);
+        _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
 
         return subscriber;
     }
@@ -1278,8 +1323,8 @@
         AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection);
         BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal);
         TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer);
-        _subscriptions.put(name,subscriber);
-        _reverseSubscriptionMap.put(subscriber.getMessageConsumer(),name);
+        _subscriptions.put(name, subscriber);
+        _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
         return subscriber;
     }
 
@@ -1291,16 +1336,14 @@
 
     public QueueBrowser createBrowser(Queue queue) throws JMSException
     {
-        checkNotClosed();
-        checkValidQueue(queue);
-        throw new UnsupportedOperationException("Queue browsing not supported");
+        return createBrowser(queue, null);
     }
 
     public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException
     {
         checkNotClosed();
         checkValidQueue(queue);
-        throw new UnsupportedOperationException("Queue browsing not supported");
+        return new AMQQueueBrowser(this, (AMQQueue) queue,messageSelector);
     }
 
     public TemporaryQueue createTemporaryQueue() throws JMSException
@@ -1476,7 +1519,14 @@
 
         bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
 
-        consumeFromQueue(consumer, queueName, protocolHandler, nowait);
+        try
+        {
+            consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer.getMessageSelector());
+        }
+        catch (JMSException e) //thrown by getMessageSelector
+        {
+            throw new AMQException(e.getMessage(), e);
+        }
     }
 
     /**
@@ -1489,7 +1539,7 @@
     {
         _consumers.remove(consumer.getConsumerTag());
         String subscriptionName = _reverseSubscriptionMap.remove(consumer);
-        if(subscriptionName != null)
+        if (subscriptionName != null)
         {
             _subscriptions.remove(subscriptionName);
         }
@@ -1497,7 +1547,7 @@
         Destination dest = consumer.getDestination();
         synchronized(dest)
         {
-            if(_destinationConsumerCount.get(dest).decrementAndGet() == 0)
+            if (_destinationConsumerCount.get(dest).decrementAndGet() == 0)
             {
                 _destinationConsumerCount.remove(dest);
             }
@@ -1567,6 +1617,16 @@
         _connection.getProtocolHandler().writeFrame(channelFlowFrame);
     }
 
+    public void confirmConsumerCancelled(String consumerTag)
+    {
+        BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag);
+        if((consumer != null) && (consumer.isAutoClose()))
+        {
+            consumer.closeWhenNoMessages(true);
+        }
+    }
+
+
     /*
      * I could have combined the last 3 methods, but this way it improves readability
      */
@@ -1576,7 +1636,7 @@
         {
             throw new javax.jms.InvalidDestinationException("Invalid Topic");
         }
-        if((topic instanceof TemporaryDestination) && ((TemporaryDestination)topic).getSession() != this)
+        if ((topic instanceof TemporaryDestination) && ((TemporaryDestination) topic).getSession() != this)
         {
             throw new JMSException("Cannot create a subscription on a temporary topic created in another session");
         }
@@ -1597,4 +1657,5 @@
             throw new javax.jms.InvalidDestinationException("Invalid Queue");
         }
     }
+
 }

Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Tue Dec 26 13:04:28 2006
@@ -145,10 +145,19 @@
      */
     private Thread _receivingThread;
 
+    /**
+     *  autoClose denotes that the consumer will automatically cancel itself when there are no more messages to receive
+     *  on the queue.  This is used for queue browsing.
+     */
+    private boolean _autoClose;
+    private boolean _closeWhenNoMessages;
+
+    private boolean _noConsume;
+
     protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector,
-                         boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
-                         AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable,
-                         int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode)
+                                   boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
+                                   AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable,
+                                   int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
     {
         _channelId = channelId;
         _connection = connection;
@@ -164,6 +173,8 @@
         _exclusive = exclusive;
         _acknowledgeMode = acknowledgeMode;
         _synchronousQueue = new ArrayBlockingQueue(prefetchHigh, true);
+        _autoClose = autoClose;
+        _noConsume = noConsume;
     }
 
     public AMQDestination getDestination()
@@ -321,6 +332,10 @@
 
         try
         {
+            if(closeOnAutoClose())
+            {
+                return null;
+            }
             Object o = null;
             if (l > 0)
             {
@@ -350,6 +365,19 @@
         }
     }
 
+    private boolean closeOnAutoClose() throws JMSException
+    {
+        if(isAutoClose() && _closeWhenNoMessages && _synchronousQueue.isEmpty())
+        {
+            close(false);
+            return true;
+        }
+        else
+        {
+            return false;
+        }
+    }
+
     public Message receiveNoWait() throws JMSException
     {
     	checkPreConditions();
@@ -358,6 +386,10 @@
 
         try
         {
+            if(closeOnAutoClose())
+            {
+                return null;
+            }
             Object o = _synchronousQueue.poll();
             final AbstractJMSMessage m = returnMessageOrThrow(o);
             if (m != null)
@@ -402,22 +434,31 @@
         }
     }
 
+
     public void close() throws JMSException
     {
+        close(true);
+    }
+
+    public void close(boolean sendClose) throws JMSException
+    {
         synchronized(_connection.getFailoverMutex())
         {
             if (!_closed.getAndSet(true))
             {
-                final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId, _consumerTag, false);
-
-                try
-                {
-                    _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class);
-                }
-                catch (AMQException e)
+                if(sendClose)
                 {
-                    _logger.error("Error closing consumer: " + e, e);
-                    throw new JMSException("Error closing consumer: " + e);
+                    final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId, _consumerTag, false);
+
+                    try
+                    {
+                        _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class);
+                    }
+                    catch (AMQException e)
+                    {
+                        _logger.error("Error closing consumer: " + e, e);
+                        throw new JMSException("Error closing consumer: " + e);
+                    }
                 }
 
                 deregisterConsumer();
@@ -513,6 +554,12 @@
     	msg.setJMSDestination(_destination);
         switch (_acknowledgeMode)
         {
+            case Session.CLIENT_ACKNOWLEDGE:
+                if (isNoConsume())
+                {
+                    _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+                }
+                break;
             case Session.DUPS_OK_ACKNOWLEDGE:
                 if (++_outstanding >= _prefetchHigh)
                 {
@@ -539,7 +586,14 @@
                 }
                 break;
             case Session.SESSION_TRANSACTED:
-                _lastDeliveryTag = msg.getDeliveryTag();
+                if (isNoConsume())
+                {
+                    _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+                }
+                else
+                {
+                    _lastDeliveryTag = msg.getDeliveryTag();
+                }
                 break;
         }
     }
@@ -629,5 +683,30 @@
     public void clearUnackedMessages()
     {
         _unacknowledgedDeliveryTags.clear();
+    }
+
+    public boolean isAutoClose()
+    {
+        return _autoClose;
+    }
+
+
+    public boolean isNoConsume()
+    {
+        return _noConsume;
+    }
+
+    public void closeWhenNoMessages(boolean b)
+    {
+        _closeWhenNoMessages = b;
+
+        if(_closeWhenNoMessages
+                && _synchronousQueue.isEmpty() 
+                && _receiving.get()
+                && _messageListener != null)
+        {
+            _receivingThread.interrupt();
+        }
+
     }
 }

Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java Tue Dec 26 13:04:28 2006
@@ -20,16 +20,16 @@
  */
 package org.apache.qpid.client;
 
-import java.util.Enumeration;
+import org.apache.qpid.common.QpidProperties;
 
 import javax.jms.ConnectionMetaData;
 import javax.jms.JMSException;
+import java.util.Enumeration;
 
 public class QpidConnectionMetaData implements ConnectionMetaData
 {
 
 
-
     QpidConnectionMetaData(AMQConnection conn)
     {
     }
@@ -46,7 +46,7 @@
 
     public String getJMSProviderName() throws JMSException
     {
-        return "Apache Qpid";
+        return "Apache " + QpidProperties.getProductName();
     }
 
     public String getJMSVersion() throws JMSException
@@ -71,8 +71,8 @@
 
     public String getProviderVersion() throws JMSException
     {
-        return "QPID (Client: [" + getClientVersion() + "] ; Broker [" + getBrokerVersion() + "] ; Protocol: [ "
-                + getProtocolVersion() + "] )";
+        return QpidProperties.getProductName() + " (Client: [" + getClientVersion() + "] ; Broker [" + getBrokerVersion() + "] ; Protocol: [ "
+               + getProtocolVersion() + "] )";
     }
 
     private String getProtocolVersion()
@@ -89,8 +89,7 @@
 
     public String getClientVersion()
     {
-        // TODO - get client build version from properties file or similar
-        return "<unknown>";
+        return QpidProperties.getBuildVersion();
     }
 
 

Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java Tue Dec 26 13:04:28 2006
@@ -23,6 +23,7 @@
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQChannelClosedException;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidSelectorException;
 import org.apache.qpid.client.AMQNoConsumersException;
 import org.apache.qpid.client.AMQNoRouteException;
 import org.apache.qpid.protocol.AMQConstant;
@@ -46,7 +47,7 @@
 
     public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
     {
-         _logger.debug("ChannelClose method received");
+        _logger.debug("ChannelClose method received");
         ChannelCloseBody method = (ChannelCloseBody) evt.getMethod();
 
         int errorCode = method.replyCode;
@@ -65,17 +66,21 @@
             {
                 throw new AMQNoConsumersException("Error: " + reason, null);
             }
+            else if (errorCode == AMQConstant.NO_ROUTE.getCode())
+            {
+                throw new AMQNoRouteException("Error: " + reason, null);
+            }
+            else if (errorCode == AMQConstant.INVALID_SELECTOR.getCode())
+            {
+                _logger.info("Broker responded with Invalid Selector.");
+
+                throw new AMQInvalidSelectorException(reason);
+            }
             else
             {
-                if (errorCode == AMQConstant.NO_ROUTE.getCode())
-                {
-                   throw new AMQNoRouteException("Error: " + reason, null);
-                }
-                else
-                {
-                    throw new AMQChannelClosedException(errorCode, "Error: " + reason);
-                }
+                throw new AMQChannelClosedException(errorCode, "Error: " + reason);
             }
+
         }
         evt.getProtocolSession().channelClosed(evt.getChannelId(), errorCode, reason);
     }

Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java Tue Dec 26 13:04:28 2006
@@ -22,6 +22,8 @@
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.common.ClientProperties;
+import org.apache.qpid.common.QpidProperties;
 import org.apache.qpid.client.protocol.AMQMethodEvent;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.client.security.AMQCallbackHandler;
@@ -119,16 +121,22 @@
 
             stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
             FieldTable clientProperties = FieldTableFactory.newFieldTable();
-            clientProperties.put("instance", ps.getClientID());
-            clientProperties.put("product", "Qpid");
-            clientProperties.put("version", "1.0");
-            clientProperties.put("platform", getFullSystemInfo());
+            
+            clientProperties.put(ClientProperties.instance.toString(), ps.getClientID());
+            _log.info("Product name: " + QpidProperties.getProductName());
+            clientProperties.put(ClientProperties.product.toString(), QpidProperties.getProductName());
+            clientProperties.put(ClientProperties.version.toString(), QpidProperties.getReleaseVersion());
+            clientProperties.put(ClientProperties.platform.toString(), getFullSystemInfo());
             ps.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(), clientProperties, mechanism,
                                                                saslResponse, selectedLocale));
         }
         catch (UnsupportedEncodingException e)
         {
             throw new AMQException(_log, "Unable to decode data: " + e, e);
+        }
+        catch (Throwable t)
+        {
+            _log.error("Error: " + t, t);
         }
     }
 

Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Tue Dec 26 13:04:28 2006
@@ -26,7 +26,8 @@
 import org.apache.qpid.url.BindingURL;
 import org.apache.qpid.url.AMQBindingURL;
 import org.apache.qpid.url.URLSyntaxException;
-import org.apache.qpid.client.*;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.BasicMessageConsumer;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.FieldTable;
 
@@ -46,7 +47,8 @@
 
     protected ByteBuffer _data;
     private boolean _readableProperties = false;
-    private boolean _readableMessage = false;
+    protected boolean _readableMessage = false;
+    protected boolean _changedData;
     private Destination _destination;
     private BasicMessageConsumer _consumer;
 
@@ -60,6 +62,7 @@
         }
         _readableProperties = false;
         _readableMessage = (data != null);
+        _changedData = (data == null);
     }
 
     protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data) throws AMQException
@@ -521,16 +524,16 @@
         return !_readableMessage;
     }
 
-    public void reset() 
+    public void reset()
     {
-        if (_readableMessage)
+        if (!_changedData)
         {
             _data.rewind();
         }
         else
         {
             _data.flip();
-            _readableMessage = true;
+            _changedData = false;
         }
     }
 

Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java Tue Dec 26 13:04:28 2006
@@ -59,6 +59,12 @@
         super(messageNbr, contentHeader, data);
     }
 
+    public void reset()
+    {
+        super.reset();
+        _readableMessage = true;
+    }
+
     public String getMimeType()
     {
         return MIME_TYPE;
@@ -226,48 +232,56 @@
     public void writeBoolean(boolean b) throws JMSException
     {
         checkWritable();
+        _changedData = true;
         _data.put(b ? (byte) 1 : (byte) 0);
     }
 
     public void writeByte(byte b) throws JMSException
     {
         checkWritable();
+        _changedData = true;
         _data.put(b);
     }
 
     public void writeShort(short i) throws JMSException
     {
         checkWritable();
+        _changedData = true;
         _data.putShort(i);
     }
 
     public void writeChar(char c) throws JMSException
     {
         checkWritable();
+        _changedData = true;
         _data.putChar(c);
     }
 
     public void writeInt(int i) throws JMSException
     {
         checkWritable();
+        _changedData = true;
         _data.putInt(i);
     }
 
     public void writeLong(long l) throws JMSException
     {
         checkWritable();
+        _changedData = true;
         _data.putLong(l);
     }
 
     public void writeFloat(float v) throws JMSException
     {
         checkWritable();
+        _changedData = true;
         _data.putFloat(v);
     }
 
     public void writeDouble(double v) throws JMSException
     {
         checkWritable();
+        _changedData = true;
         _data.putDouble(v);
     }
 
@@ -281,7 +295,7 @@
             
             _data.putShort((short)encodedString.limit());
             _data.put(encodedString);
-
+            _changedData = true;
             //_data.putString(string, Charset.forName("UTF-8").newEncoder());
             // we must add the null terminator manually
             //_data.put((byte)0);
@@ -298,12 +312,14 @@
     {
         checkWritable();
         _data.put(bytes);
+        _changedData = true;
     }
 
     public void writeBytes(byte[] bytes, int offset, int length) throws JMSException
     {
         checkWritable();
         _data.put(bytes, offset, length);
+        _changedData = true;
     }
 
     public void writeObject(Object object) throws JMSException

Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java Tue Dec 26 13:04:28 2006
@@ -112,7 +112,7 @@
         }
 
     }
-
+  
     public Serializable getObject() throws JMSException
     {
         ObjectInputStream in = null;
@@ -123,18 +123,18 @@
 
         try
         {
-        	_data.rewind();
+            _data.rewind();
             in = new ObjectInputStream(_data.asInputStream());
             return (Serializable) in.readObject();
         }
         catch (IOException e)
-        {           
-           e.printStackTrace();
-           throw new MessageFormatException("Could not deserialize message: " + e);
+        {
+            e.printStackTrace();
+            throw new MessageFormatException("Could not deserialize message: " + e);
         }
         catch (ClassNotFoundException e)
         {
-        	e.printStackTrace();
+            e.printStackTrace();
             throw new MessageFormatException("Could not deserialize message: " + e);
         }
         finally

Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java Tue Dec 26 13:04:28 2006
@@ -86,6 +86,12 @@
         super(messageNbr, contentHeader, data);
     }
 
+    public void reset()
+    {
+        super.reset();
+        _readableMessage = true;
+    }
+
     public String getMimeType()
     {
         return MIME_TYPE;
@@ -103,6 +109,7 @@
     {
         checkWritable();
         _data.put(type);
+        _changedData = true;
     }
 
     public boolean readBoolean() throws JMSException
@@ -693,7 +700,7 @@
             {
                 _data.putString(string, Charset.forName("UTF-8").newEncoder());
                 // we must write the null terminator ourselves
-                _data.put((byte)0);
+                _data.put((byte) 0);
             }
             catch (CharacterCodingException e)
             {
@@ -706,7 +713,7 @@
 
     public void writeBytes(byte[] bytes) throws JMSException
     {
-        writeBytes(bytes, 0, bytes == null?0:bytes.length);
+        writeBytes(bytes, 0, bytes == null ? 0 : bytes.length);
     }
 
     public void writeBytes(byte[] bytes, int offset, int length) throws JMSException

Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java Tue Dec 26 13:04:28 2006
@@ -117,6 +117,7 @@
                 {
                     _data.put(text.getBytes(getJmsContentHeaderProperties().getEncoding()));
                 }
+                _changedData=true;
             }
             _decodedValue = text;
         }

Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Tue Dec 26 13:04:28 2006
@@ -406,4 +406,12 @@
             HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
         }
     }
+
+    public void confirmConsumerCancelled(int channelId, String consumerTag)
+    {
+        final Integer chId = channelId;
+        final AMQSession session = (AMQSession) _channelId2SessionMap.get(chId);
+
+        session.confirmConsumerCancelled(consumerTag);
+    }
 }

Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java Tue Dec 26 13:04:28 2006
@@ -110,7 +110,7 @@
             }
             else
             {
-                throw new AMQException("Woken up due to exception", _error); // FIXME: This will wrap FailoverException and prevent it being caught.
+                throw new AMQException("Woken up due to " + _error.getClass(), _error); // FIXME: This will wrap FailoverException and prevent it being caught.
             }
         }
 

Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java Tue Dec 26 13:04:28 2006
@@ -103,6 +103,7 @@
         frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
         frame2handlerMap.put(BasicDeliverBody.class, BasicDeliverMethodHandler.getInstance());
         frame2handlerMap.put(BasicReturnBody.class, BasicReturnMethodHandler.getInstance());
+        frame2handlerMap.put(BasicCancelOkBody.class, BasicCancelOkMethodHandler.getInstance());
         frame2handlerMap.put(ChannelFlowOkBody.class, ChannelFlowOkMethodHandler.getInstance());
         frame2handlerMap.put(QueueDeleteOkBody.class, QueueDeleteOkMethodHandler.getInstance());
         frame2handlerMap.put(ExchangeBoundOkBody.class, ExchangeBoundOkMethodHandler.getInstance());

Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java Tue Dec 26 13:04:28 2006
@@ -59,7 +59,7 @@
         // once more testing of the performance of the simple allocator has been done
         if (!Boolean.getBoolean("amqj.enablePooledAllocator"))
         {
-            _logger.warn("Using SimpleByteBufferAllocator");
+            _logger.info("Using SimpleByteBufferAllocator");
             ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
         }
 

Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java Tue Dec 26 13:04:28 2006
@@ -40,11 +40,15 @@
 import java.util.Hashtable;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
 
 public class PropertiesFileInitialContextFactory implements InitialContextFactory
 {
-    protected final Logger _logger = Logger.getLogger(getClass());
+    protected final Logger _logger = Logger.getLogger(PropertiesFileInitialContextFactory.class);
 
     private String CONNECTION_FACTORY_PREFIX = "connectionfactory.";
     private String DESTINATION_PREFIX = "destination.";
@@ -54,6 +58,41 @@
     public Context getInitialContext(Hashtable environment) throws NamingException
     {
         Map data = new ConcurrentHashMap();
+
+        try
+        {
+
+            String file = null;
+            if (environment.contains(Context.PROVIDER_URL))
+            {
+                file = (String) environment.get(Context.PROVIDER_URL);
+            }
+            else
+            {
+                file = System.getProperty(Context.PROVIDER_URL);
+            }
+
+            if (file != null)
+            {
+                _logger.info("Loading Properties from:" + file);
+                //Load the properties specified
+                Properties p = new Properties();
+
+                p.load(new BufferedInputStream(new FileInputStream(file)));
+
+                environment.putAll(p);
+                _logger.info("Loaded Context Properties:" + environment.toString());
+            }
+            else
+            {
+                _logger.warn("No Provider URL specified.");
+            }
+        }
+        catch (IOException ioe)
+        {
+            _logger.warn("Unable to load property file specified in Provider_URL:" +
+                         environment.get(Context.PROVIDER_URL));
+        }
 
         createConnectionFactories(data, environment);
 

Modified: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceRequestingClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceRequestingClient.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceRequestingClient.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceRequestingClient.java Tue Dec 26 13:04:28 2006
@@ -29,6 +29,7 @@
 import javax.jms.*;
 import javax.jms.MessageConsumer;
 import javax.jms.Session;
+import javax.jms.Message;
 import java.util.Hashtable;
 import java.io.File;
 import java.io.FilenameFilter;

Modified: incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java Tue Dec 26 13:04:28 2006
@@ -116,7 +116,9 @@
             m.setIntProperty("Int", (int) Integer.MAX_VALUE);
 
             m.setJMSCorrelationID("Correlation");
-            m.setJMSPriority(100);
+            //fixme the m.setJMSMessage has no effect
+            producer.setPriority(8);
+            m.setJMSPriority(3);
 
             //  Queue
             Queue q;
@@ -182,10 +184,8 @@
                                 (int) Integer.MAX_VALUE, m.getIntProperty("Int"));
             Assert.assertEquals("Check CorrelationID properties are correctly transported",
                                 "Correlation", m.getJMSCorrelationID());
-
-            _logger.warn("getJMSPriority not being verified.");
-//            Assert.assertEquals("Check Priority properties are correctly transported",
-//                                100, m.getJMSPriority());
+            Assert.assertEquals("Check Priority properties are correctly transported",
+                                8, m.getJMSPriority());
 
             // Queue
             Assert.assertEquals("Check ReplyTo properties are correctly transported",

Modified: incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java Tue Dec 26 13:04:28 2006
@@ -121,7 +121,7 @@
     {
         if (_connection != null)
         {
-            System.out.println(">>>>>>>>>>>>>>.. closing");
+            _log.info(">>>>>>>>>>>>>>.. closing");
             _connection.close();
         }
     }
@@ -137,7 +137,7 @@
         {
             public void onException(JMSException jmsException)
             {
-                _log.error("onException - ", jmsException);
+                _log.warn("onException - "+jmsException.getMessage());
             }
         });
 

Modified: incubator/qpid/branches/new_persistence/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java (original)
+++ incubator/qpid/branches/new_persistence/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java Tue Dec 26 13:04:28 2006
@@ -20,6 +20,8 @@
 import org.apache.log4j.Logger;
 import org.apache.qpid.server.cluster.util.LogMessage;
 
+import java.util.List;
+
 class ClusteredSubscriptionManager extends SubscriptionSet
 {
     private static final Logger _logger = Logger.getLogger(ClusteredSubscriptionManager.class);
@@ -77,6 +79,11 @@
         public int getWeight()
         {
             return ClusteredSubscriptionManager.this.getWeight();
+        }
+
+        public List<Subscription> getSubscriptions()
+        {
+            return ClusteredSubscriptionManager.super.getSubscriptions();
         }
 
         public boolean hasActiveSubscribers()

Modified: incubator/qpid/branches/new_persistence/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java (original)
+++ incubator/qpid/branches/new_persistence/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java Tue Dec 26 13:04:28 2006
@@ -18,12 +18,12 @@
 package org.apache.qpid.server.queue;
 
 import java.util.List;
+import java.util.LinkedList;
 import java.util.concurrent.CopyOnWriteArrayList;
 
 /**
  * Distributes messages among a list of subsscription managers, using their
  * weighting.
- *
  */
 class NestedSubscriptionManager implements SubscriptionManager
 {
@@ -41,11 +41,24 @@
         _subscribers.remove(s);
     }
 
+
+    public List<Subscription> getSubscriptions()
+    {
+        List<Subscription> allSubs = new LinkedList<Subscription>();
+
+        for (WeightedSubscriptionManager subMans : _subscribers)
+        {
+            allSubs.addAll(subMans.getSubscriptions());
+        }
+
+        return allSubs;
+    }
+
     public boolean hasActiveSubscribers()
     {
-        for(WeightedSubscriptionManager s : _subscribers)
+        for (WeightedSubscriptionManager s : _subscribers)
         {
-            if(s.hasActiveSubscribers())
+            if (s.hasActiveSubscribers())
             {
                 return true;
             }
@@ -56,9 +69,9 @@
     public Subscription nextSubscriber(AMQMessage msg)
     {
         WeightedSubscriptionManager start = current();
-        for(WeightedSubscriptionManager s = start; s != null; s = next(start))
+        for (WeightedSubscriptionManager s = start; s != null; s = next(start))
         {
-            if(hasMore(s))
+            if (hasMore(s))
             {
                 return nextSubscriber(s);
             }
@@ -91,7 +104,7 @@
     private WeightedSubscriptionManager next()
     {
         _iterations = 0;
-        if(++_index >= _subscribers.size())
+        if (++_index >= _subscribers.size())
         {
             _index = 0;
         }

Modified: incubator/qpid/branches/new_persistence/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java (original)
+++ incubator/qpid/branches/new_persistence/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java Tue Dec 26 13:04:28 2006
@@ -22,6 +22,9 @@
 import org.apache.qpid.server.cluster.SimpleSendable;
 import org.apache.qpid.AMQException;
 
+import java.util.Queue;
+import java.util.List;
+
 class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManager
 {
     private final GroupManager _groupMgr;
@@ -73,6 +76,11 @@
         return _count;
     }
 
+    public List<Subscription> getSubscriptions()
+    {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
     public boolean hasActiveSubscribers()
     {
         return getWeight() == 0;
@@ -85,9 +93,49 @@
 
     public void queueDeleted(AMQQueue queue)
     {
-        if(queue instanceof ClusteredQueue)
+        if (queue instanceof ClusteredQueue)
         {
             ((ClusteredQueue) queue).removeAllRemoteSubscriber(_peer);
         }
+    }
+
+    public boolean hasFilters()
+    {
+        return false;
+    }
+
+    public boolean hasInterest(AMQMessage msg)
+    {
+        return true;
+    }
+
+    public Queue<AMQMessage> getPreDeliveryQueue()
+    {
+        return null;
+    }
+
+    public void enqueueForPreDelivery(AMQMessage msg)
+    {
+        //no-op -- if selectors are implemented on RemoteSubscriptions then look at SubscriptionImpl
+    }
+
+    public boolean isAutoClose()
+    {
+        return false;
+    }
+
+    public void close()
+    {
+        //no-op
+    }
+
+    public boolean isBrowser()
+    {
+        return false;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void sendNextMessage(AMQQueue queue)
+    {
+
     }
 }