You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2008/02/28 13:18:14 UTC

svn commit: r631938 [2/5] - in /incubator/qpid/branches/thegreatmerge/qpid: ./ java/ java/broker/ java/broker/src/main/java/org/apache/qpid/server/ java/broker/src/main/java/org/apache/qpid/server/exchange/ java/broker/src/main/java/org/apache/qpid/ser...

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Thu Feb 28 04:16:41 2008
@@ -208,27 +208,10 @@
         {
             _logger.debug("Frame Received: " + frame);
         }
+                
+
+        body.handle(channelId, this);
 
-        if (body instanceof AMQMethodBody)
-        {
-            methodFrameReceived(channelId, (AMQMethodBody) body);
-        }
-        else if (body instanceof ContentHeaderBody)
-        {
-            contentHeaderReceived(channelId, (ContentHeaderBody) body);
-        }
-        else if (body instanceof ContentBody)
-        {
-            contentBodyReceived(channelId, (ContentBody) body);
-        }
-        else if (body instanceof HeartbeatBody)
-        {
-            // NO OP
-        }
-        else
-        {
-            _logger.warn("Unrecognised frame " + frame.getClass().getName());
-        }
     }
 
     private void protocolInitiationReceived(ProtocolInitiation pi)
@@ -271,7 +254,7 @@
         }
     }
 
-    private void methodFrameReceived(int channelId, AMQMethodBody methodBody)
+    public void methodFrameReceived(int channelId, AMQMethodBody methodBody)
     {
 
         final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(channelId, methodBody);
@@ -365,7 +348,7 @@
         }
         catch (Exception e)
         {
-            _stateManager.error(e);
+            
             for (AMQMethodListener listener : _frameListeners)
             {
                 listener.error(e);
@@ -375,7 +358,7 @@
         }
     }
 
-    private void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException
+    public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException
     {
 
         AMQChannel channel = getAndAssertChannel(channelId);
@@ -384,11 +367,16 @@
 
     }
 
-    private void contentBodyReceived(int channelId, ContentBody body) throws AMQException
+    public void contentBodyReceived(int channelId, ContentBody body) throws AMQException
     {
         AMQChannel channel = getAndAssertChannel(channelId);
 
         channel.publishContentBody(body, this);
+    }
+
+    public void heartbeatBodyReceived(int channelId, HeartbeatBody body)
+    {
+        // NO - OP
     }
 
     /**

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java Thu Feb 28 04:16:41 2008
@@ -32,6 +32,7 @@
 import org.apache.mina.filter.SSLFilter;
 import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
 import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.filter.codec.QpidProtocolCodecFilter;
 import org.apache.mina.filter.executor.ExecutorFilter;
 import org.apache.mina.transport.socket.nio.SocketSessionConfig;
 import org.apache.mina.util.SessionUtil;
@@ -83,7 +84,7 @@
         createSession(protocolSession, _applicationRegistry, codecFactory);
         _logger.info("Protocol session created for:" + protocolSession.getRemoteAddress());
 
-        final ProtocolCodecFilter pcf = new ProtocolCodecFilter(codecFactory);
+        final QpidProtocolCodecFilter pcf = new QpidProtocolCodecFilter(codecFactory);
 
         ConnectorConfiguration connectorConfig = ApplicationRegistry.getInstance().
                 getConfiguredObject(ConnectorConfiguration.class);

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Thu Feb 28 04:16:41 2008
@@ -102,7 +102,7 @@
     private long _expiration;
 
 
-    private final int hashcode = System.identityHashCode(this);
+
 
     private Exchange _exchange;
     private static final boolean SYNCED_CLOCKS =
@@ -111,7 +111,7 @@
 
     public String debugIdentity()
     {
-        return "(HC:" + hashcode + " ID:" + _messageId + " Ref:" + _referenceCount.get() + ")";
+        return "(HC:" + System.identityHashCode(this) + " ID:" + _messageId + " Ref:" + _referenceCount.get() + ")";
     }
 
     public void setExpiration()
@@ -160,6 +160,11 @@
         _exchange.route(this);
     }
 
+    public void enqueue(final List<AMQQueue> queues)
+    {
+        _transientMessageData.setDestinationQueues(queues);
+    }
+
     /**
      * Used to iterate through all the body frames associated with this message. Will not keep all the data in memory
      * therefore is memory-efficient.
@@ -763,7 +768,7 @@
         }
         finally
         {
-            destinationQueues.clear();
+
             // Remove refence for routing process . Reference count should now == delivered queue count
             decrementReference(storeContext);
         }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Thu Feb 28 04:16:41 2008
@@ -39,11 +39,7 @@
 
 import javax.management.JMException;
 import java.text.MessageFormat;
-import java.util.Collection;
-import java.util.Hashtable;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
+import java.util.*;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -149,6 +145,8 @@
     public AtomicLong _totalMessagesReceived = new AtomicLong();
 
 
+    private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class);
+
 
     public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
             throws AMQException
@@ -192,6 +190,13 @@
         _subscriptionFactory = subscriptionFactory;
         _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
         _queueId = s_queueID++;
+
+        // This ensure that the notification checks for the configured alerts are created.
+        setMaximumMessageAge(_maximumMessageAge);
+        setMaximumMessageCount(_maximumMessageCount);
+        setMaximumMessageSize(_maximumMessageSize);
+        setMaximumQueueDepth(_maximumQueueDepth);
+
     }
 
     private AMQQueueMBean createMBean() throws AMQException
@@ -206,7 +211,7 @@
         }
     }
 
-    public AMQShortString getName()
+    public final AMQShortString getName()
     {
         return _name;
     }
@@ -539,9 +544,17 @@
         return _maximumMessageSize;
     }
 
-    public void setMaximumMessageSize(long value)
+    public void setMaximumMessageSize(final long maximumMessageSize)
     {
-        _maximumMessageSize = value;
+        _maximumMessageSize = maximumMessageSize;
+        if(maximumMessageSize == 0L)
+        {
+            _notificationChecks.remove(NotificationCheck.MESSAGE_SIZE_ALERT);
+        }
+        else
+        {
+            _notificationChecks.add(NotificationCheck.MESSAGE_SIZE_ALERT);
+        }
     }
 
     public int getConsumerCount()
@@ -564,9 +577,20 @@
         return _maximumMessageCount;
     }
 
-    public void setMaximumMessageCount(long value)
+    public void setMaximumMessageCount(final long maximumMessageCount)
     {
-        _maximumMessageCount = value;
+        _maximumMessageCount = maximumMessageCount;
+        if(maximumMessageCount == 0L)
+        {
+            _notificationChecks.remove(NotificationCheck.MESSAGE_COUNT_ALERT);
+        }
+        else
+        {
+            _notificationChecks.add(NotificationCheck.MESSAGE_COUNT_ALERT);
+        }
+
+
+
     }
 
     public long getMaximumQueueDepth()
@@ -575,9 +599,18 @@
     }
 
     // Sets the queue depth, the max queue size
-    public void setMaximumQueueDepth(long value)
+    public void setMaximumQueueDepth(final long maximumQueueDepth)
     {
-        _maximumQueueDepth = value;
+        _maximumQueueDepth = maximumQueueDepth;
+        if(maximumQueueDepth == 0L)
+        {
+            _notificationChecks.remove(NotificationCheck.QUEUE_DEPTH_ALERT);
+        }
+        else
+        {
+            _notificationChecks.add(NotificationCheck.QUEUE_DEPTH_ALERT);
+        }
+
     }
 
     public long getOldestMessageArrivalTime()
@@ -682,6 +715,10 @@
         }
 
         _subscribers.addSubscriber(subscription);
+        if(exclusive)
+        {
+            _subscribers.setExclusive(true);
+        }
     }
 
     private boolean isExclusive()
@@ -713,6 +750,7 @@
                     consumerTag, this));
         }
 
+        _subscribers.setExclusive(false);
         Subscription removedSubscription;
 
         if ((removedSubscription =
@@ -827,7 +865,7 @@
     public void process(StoreContext storeContext, QueueEntry entry, boolean deliverFirst) throws AMQException
     {
         AMQMessage msg = entry.getMessage();
-        _deliveryMgr.deliver(storeContext, getName(), entry, deliverFirst);
+        _deliveryMgr.deliver(storeContext, _name, entry, deliverFirst);
         try
         {
             msg.checkDeliveredToConsumer();
@@ -960,6 +998,14 @@
     public void setMaximumMessageAge(long maximumMessageAge)
     {
         _maximumMessageAge = maximumMessageAge;
+        if(maximumMessageAge == 0L)
+        {
+            _notificationChecks.remove(NotificationCheck.MESSAGE_AGE_ALERT);
+        }
+        else
+        {
+            _notificationChecks.add(NotificationCheck.MESSAGE_AGE_ALERT);
+        }
     }
 
     public void subscriberHasPendingResend(boolean hasContent, SubscriptionImpl subscription, QueueEntry entry)
@@ -1040,5 +1086,10 @@
                 _deliveryMgr.removeExpired();
             }
         }
+    }
+
+    public final Set<NotificationCheck> getNotificationChecks()
+    {
+        return _notificationChecks;
     }
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Thu Feb 28 04:16:41 2008
@@ -54,10 +54,7 @@
 import javax.management.openmbean.TabularType;
 
 import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.List;
+import java.util.*;
 
 /**
  * MBean class for AMQQueue. It implements all the management features exposed
@@ -97,6 +94,9 @@
     private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
     private Notification _lastNotification = null;
 
+
+
+
     @MBeanConstructor("Creates an MBean exposing an AMQQueue")
     public AMQQueueMBean(AMQQueue queue) throws JMException
     {
@@ -249,16 +249,21 @@
     public void checkForNotification(AMQMessage msg) throws AMQException, JMException
     {
 
-        final long currentTime = System.currentTimeMillis();
-        final long thresholdTime = currentTime - _queue.getMinimumAlertRepeatGap();
+        final Set<NotificationCheck> notificationChecks = _queue.getNotificationChecks();
 
-        for (NotificationCheck check : NotificationCheck.values())
+        if(!notificationChecks.isEmpty())
         {
-            if (check.isMessageSpecific() || (_lastNotificationTimes[check.ordinal()] < thresholdTime))
+            final long currentTime = System.currentTimeMillis();
+            final long thresholdTime = currentTime - _queue.getMinimumAlertRepeatGap();
+
+            for (NotificationCheck check : notificationChecks)
             {
-                if (check.notifyIfNecessary(msg, _queue, this))
+                if (check.isMessageSpecific() || (_lastNotificationTimes[check.ordinal()] < thresholdTime))
                 {
-                    _lastNotificationTimes[check.ordinal()] = currentTime;
+                    if (check.notifyIfNecessary(msg, _queue, this))
+                    {
+                        _lastNotificationTimes[check.ordinal()] = currentTime;
+                    }
                 }
             }
         }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Thu Feb 28 04:16:41 2008
@@ -363,8 +363,9 @@
 
                     protocolSession.getProtocolOutputConverter().writeGetOk(entry.getMessage(), channel.getChannelId(),
                                                                             deliveryTag, _queue.getMessageCount());
-                    _totalMessageSize.addAndGet(-entry.getSize());
+
                 }
+                _totalMessageSize.addAndGet(-entry.getSize());
 
                 if (!acks)
                 {
@@ -884,7 +885,7 @@
                 {
                     if (!s.isSuspended())
                     {
-                        if (_log.isDebugEnabled())
+                        if (debugEnabled)
                         {
                             _log.debug(debugIdentity() + "Delivering Message:" + entry.getMessage().debugIdentity() + " to(" +
                                        System.identityHashCode(s) + ") :" + s);

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java Thu Feb 28 04:16:41 2008
@@ -51,7 +51,7 @@
 
         ExchangeBinding(AMQShortString routingKey, Exchange exchange, FieldTable arguments)
         {
-            _routingKey = routingKey;
+            _routingKey = routingKey == null ? AMQShortString.EMPTY_STRING : routingKey;
             _exchange = exchange;
             _arguments = arguments == null ? EMPTY_ARGUMENTS : arguments;
         }
@@ -74,8 +74,7 @@
         public int hashCode()
         {
             return (_exchange == null ? 0 : _exchange.hashCode())
-                   + (_routingKey == null ? 0 : _routingKey.hashCode())
-                   + (_arguments == null ? 0 : _arguments.hashCode());
+                   + (_routingKey == null ? 0 : _routingKey.hashCode());
         }
 
         public boolean equals(Object o)
@@ -86,8 +85,7 @@
             }
             ExchangeBinding eb = (ExchangeBinding) o;
             return _exchange.equals(eb._exchange)
-                   && _routingKey.equals(eb._routingKey)
-                   && _arguments.equals(eb._arguments);
+                   && _routingKey.equals(eb._routingKey);
         }
     }
 

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java Thu Feb 28 04:16:41 2008
@@ -29,9 +29,9 @@
     {
         boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
         {
-            int msgCount = queue.getMessageCount();
+            int msgCount;
             final long maximumMessageCount = queue.getMaximumMessageCount();
-            if (maximumMessageCount!= 0 && msgCount >= maximumMessageCount)
+            if (maximumMessageCount!= 0 && (msgCount =  queue.getMessageCount()) >= maximumMessageCount)
             {
                 listener.notifyClients(this, queue, msgCount + ": Maximum count on queue threshold ("+ maximumMessageCount +") breached.");
                 return true;

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Thu Feb 28 04:16:41 2008
@@ -292,14 +292,17 @@
                 queue.dequeue(storeContext, entry);
             }
 
+/*
+            if (_sendLock.get())
+            {
+                _logger.error("Sending " + entry + " when subscriber(" + this + ") is closed!");
+            }
+*/
+
             synchronized (channel)
             {
                 long deliveryTag = channel.getNextDeliveryTag();
 
-                if (_sendLock.get())
-                {
-                    _logger.error("Sending " + entry + " when subscriber(" + this + ") is closed!");
-                }
 
                 if (_acks)
                 {
@@ -308,10 +311,11 @@
 
                 protocolSession.getProtocolOutputConverter().writeDeliver(entry.getMessage(), channel.getChannelId(), deliveryTag, consumerTag);
 
-                if (!_acks)
-                {
-                    entry.getMessage().decrementReference(storeContext);
-                }
+
+            }
+            if (!_acks)
+            {
+                entry.getMessage().decrementReference(storeContext);
             }
         }
         finally
@@ -367,59 +371,60 @@
 //            return false;
         }
 
-        final AMQProtocolSession publisher = entry.getMessage().getPublisher();
+
 
         //todo - client id should be recoreded and this test removed but handled below
-        if (_noLocal && publisher != null)
+        if (_noLocal)
         {
-            // We don't want local messages so check to see if message is one we sent
-            Object localInstance;
-            Object msgInstance;
 
-            if ((protocolSession.getClientProperties() != null) &&
-                (localInstance = protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
+            final AMQProtocolSession publisher = entry.getMessage().getPublisher();
+            if(publisher != null)
+
             {
+                // We don't want local messages so check to see if message is one we sent
+                Object localInstance;
+                Object msgInstance;
 
-                if ((publisher.getClientProperties() != null) &&
-                    (msgInstance = publisher.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
+                if ((protocolSession.getClientProperties() != null) &&
+                    (localInstance = protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
                 {
-                    if (localInstance == msgInstance || localInstance.equals(msgInstance))
+
+                    if ((publisher.getClientProperties() != null) &&
+                        (msgInstance = publisher.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
                     {
-//                        if (_logger.isTraceEnabled())
-//                        {
-//                            _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" +
-//                                          msg.debugIdentity() + ")");
-//                        }
-                        return false;
+                        if (localInstance == msgInstance || localInstance.equals(msgInstance))
+                        {
+    //                        if (_logger.isTraceEnabled())
+    //                        {
+    //                            _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" +
+    //                                          msg.debugIdentity() + ")");
+    //                        }
+                            return false;
+                        }
                     }
                 }
-            }
-            else
-            {
+                else
+                {
 
-                localInstance = protocolSession.getClientIdentifier();
-                //todo - client id should be recoreded and this test removed but handled here
+                    localInstance = protocolSession.getClientIdentifier();
+                    //todo - client id should be recoreded and this test removed but handled here
 
-                msgInstance = publisher.getClientIdentifier();
-                if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance)))
-                {
-//                    if (_logger.isTraceEnabled())
-//                    {
-//                        _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" +
-//                                      msg.debugIdentity() + ")");
-//                    }
-                    return false;
+                    msgInstance = publisher.getClientIdentifier();
+                    if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance)))
+                    {
+    //                    if (_logger.isTraceEnabled())
+    //                    {
+    //                        _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" +
+    //                                      msg.debugIdentity() + ")");
+    //                    }
+                        return false;
+                    }
                 }
-            }
-
 
+            }
         }
 
 
-        if (_logger.isDebugEnabled())
-        {
-            _logger.debug("(" + debugIdentity() + ") checking filters for message (" + entry.debugIdentity());
-        }
         return checkFilters(entry);
 
     }
@@ -433,23 +438,7 @@
 
     private boolean checkFilters(QueueEntry msg)
     {
-        if (_filters != null)
-        {
-//            if (_logger.isTraceEnabled())
-//            {
-//                _logger.trace("(" + debugIdentity() + ") has filters.");
-//            }
-            return _filters.allAllow(msg.getMessage());
-        }
-        else
-        {
-//            if (_logger.isTraceEnabled())
-//            {
-//                _logger.trace("(" + debugIdentity() + ") has no filters");
-//            }
-
-            return true;
-        }
+        return (_filters == null) || _filters.allAllow(msg.getMessage());
     }
 
     public Queue<QueueEntry> getPreDeliveryQueue()
@@ -613,7 +602,7 @@
 
     public boolean wouldSuspend(QueueEntry msg)
     {
-        return channel.wouldSuspend(msg.getMessage());
+        return _acks && channel.wouldSuspend(msg.getMessage());
     }
 
     public Queue<QueueEntry> getResendQueue()

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java Thu Feb 28 04:16:41 2008
@@ -39,6 +39,7 @@
     private int _currentSubscriber;
 
     private final Object _changeLock = new Object();
+    private volatile boolean _exclusive;
 
 
     /** Accessor for unit tests. */
@@ -116,10 +117,7 @@
      */
     public Subscription nextSubscriber(QueueEntry msg)
     {
-        if (_subscriptions.isEmpty())
-        {
-            return null;
-        }
+
 
         try
         {
@@ -143,30 +141,64 @@
 
     private Subscription nextSubscriberImpl(QueueEntry msg)
     {
-        final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber);
-        while (iterator.hasNext())
+        if(_exclusive)
         {
-            Subscription subscription = iterator.next();
-            ++_currentSubscriber;
-            subscriberScanned();
-
-            if (!(subscription.isSuspended() || subscription.wouldSuspend(msg)))
+            try
             {
-                if (subscription.hasInterest(msg))
+                Subscription subscription = _subscriptions.get(0);
+                subscriberScanned();
+
+                if (!(subscription.isSuspended() || subscription.wouldSuspend(msg)))
                 {
-                    // if the queue is not empty then this client is ready to receive a message.
-                    //FIXME the queue could be full of sent messages.
-                    // Either need to clean all PDQs after sending a message
-                    // OR have a clean up thread that runs the PDQs expunging the messages.
-                    if (!subscription.filtersMessages() || subscription.getPreDeliveryQueue().isEmpty())
+                    if (subscription.hasInterest(msg))
                     {
-                        return subscription;
+                        // if the queue is not empty then this client is ready to receive a message.
+                        //FIXME the queue could be full of sent messages.
+                        // Either need to clean all PDQs after sending a message
+                        // OR have a clean up thread that runs the PDQs expunging the messages.
+                        if (!subscription.filtersMessages() || subscription.getPreDeliveryQueue().isEmpty())
+                        {
+                            return subscription;
+                        }
                     }
                 }
             }
+            catch(IndexOutOfBoundsException e)
+            {
+            }
+            return null;
         }
+        else
+        {
+            if (_subscriptions.isEmpty())
+            {
+                return null;
+            }
+            final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber);
+            while (iterator.hasNext())
+            {
+                Subscription subscription = iterator.next();
+                ++_currentSubscriber;
+                subscriberScanned();
 
-        return null;
+                if (!(subscription.isSuspended() || subscription.wouldSuspend(msg)))
+                {
+                    if (subscription.hasInterest(msg))
+                    {
+                        // if the queue is not empty then this client is ready to receive a message.
+                        //FIXME the queue could be full of sent messages.
+                        // Either need to clean all PDQs after sending a message
+                        // OR have a clean up thread that runs the PDQs expunging the messages.
+                        if (!subscription.filtersMessages() || subscription.getPreDeliveryQueue().isEmpty())
+                        {
+                            return subscription;
+                        }
+                    }
+                }
+            }
+
+            return null;
+        }
     }
 
     /** Overridden in test classes. */
@@ -233,5 +265,14 @@
     {
         return _changeLock;
     }
-    
+
+    public void setExclusive(final boolean exclusive)
+    {
+        _exclusive = exclusive;
+    }
+
+    public boolean getExcBoolean()
+    {
+        return _exclusive;
+    }
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java Thu Feb 28 04:16:41 2008
@@ -22,6 +22,8 @@
 
 import java.util.LinkedList;
 import java.util.List;
+import java.util.ArrayList;
+import java.util.Collections;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
@@ -60,7 +62,7 @@
      * delivered. It is <b>cleared after delivery has been attempted</b>. Any persistent record of destinations is done
      * by the message handle.
      */
-    private List<AMQQueue> _destinationQueues = new LinkedList<AMQQueue>();
+    private List<AMQQueue> _destinationQueues;
 
     public MessagePublishInfo getMessagePublishInfo()
     {
@@ -74,7 +76,7 @@
 
     public List<AMQQueue> getDestinationQueues()
     {
-        return _destinationQueues;
+        return _destinationQueues == null ? (List<AMQQueue>) Collections.EMPTY_LIST : _destinationQueues;
     }
 
     public void setDestinationQueues(List<AMQQueue> destinationQueues)
@@ -109,6 +111,10 @@
 
     public void addDestinationQueue(AMQQueue queue)
     {
+        if(_destinationQueues == null)
+        {
+            _destinationQueues = new ArrayList<AMQQueue>();
+        }
         _destinationQueues.add(queue);
     }
 

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java Thu Feb 28 04:16:41 2008
@@ -49,11 +49,11 @@
     /** Where to put undeliverable messages */
     private final List<RequiredDeliveryException> _returnMessages;
 
-    private Set<Long> _browsedAcks;
+    private final Set<Long> _browsedAcks;
 
     private final MessageStore _messageStore;
 
-    private StoreContext _storeContext;
+    private final StoreContext _storeContext;
 
     /** Whether we are in a transaction */
     private boolean _inTran;

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java Thu Feb 28 04:16:41 2008
@@ -24,7 +24,6 @@
 import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.store.StoreContext;
 

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Thu Feb 28 04:16:41 2008
@@ -23,6 +23,7 @@
 import javax.management.NotCompliantMBeanException;
 
 import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.log4j.Logger;
 import org.apache.qpid.server.AMQBrokerManagerMBean;
 import org.apache.qpid.server.security.access.AccessManager;
@@ -126,7 +127,7 @@
      */
     public VirtualHost(String name, MessageStore store) throws Exception
     {
-        this(name, null, store);
+        this(name, new PropertiesConfiguration(), store);
     }
 
     /**

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java Thu Feb 28 04:16:41 2008
@@ -178,7 +178,7 @@
     public void testQueueDepthAlertWithSubscribers() throws Exception
     {
         protocolSession = new TestMinaProtocolSession();
-        AMQChannel channel = new AMQChannel(protocolSession, 2,_txm, _messageStore, null);
+        AMQChannel channel = new AMQChannel(protocolSession, 2,_txm, _messageStore);
         protocolSession.addChannel(channel);
 
         // Create queue

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java Thu Feb 28 04:16:41 2008
@@ -129,7 +129,7 @@
 
 
         TestMinaProtocolSession protocolSession = new TestMinaProtocolSession();
-        AMQChannel channel = new AMQChannel(protocolSession, 1,_txm, _messageStore, null);
+        AMQChannel channel = new AMQChannel(protocolSession, 1,_txm, _messageStore);
         protocolSession.addChannel(channel);
 
         _queue.registerProtocolSession(protocolSession, 1, new AMQShortString("test"), false, null,false,false);

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client-java14/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client-java14/pom.xml?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client-java14/pom.xml (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client-java14/pom.xml Thu Feb 28 04:16:41 2008
@@ -1,224 +1,224 @@
-<!--
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements. See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership. The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License. You may obtain a copy of the License at
-    
-    http://www.apache.org/licenses/LICENSE-2.0
-    
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied. See the License for the
-    specific language governing permissions and limitations
-    under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-    <modelVersion>4.0.0</modelVersion>
-    <groupId>org.apache.qpid</groupId>
-    <artifactId>qpid-client-java14</artifactId>
-    <packaging>jar</packaging>
-    <version>1.0-incubating-M3-SNAPSHOT</version>
-    <name>Qpid Client for Java 1.4</name>
-    <url>http://cwiki.apache.org/confluence/display/qpid</url>
-
-    <parent>
-        <groupId>org.apache.qpid</groupId>
-        <artifactId>qpid</artifactId>
-        <version>1.0-incubating-M3-SNAPSHOT</version>
-        <relativePath>../pom.xml</relativePath>
-    </parent>
-
-    <properties>
-        <topDirectoryLocation>..</topDirectoryLocation>
-        <java.source.version>1.4</java.source.version>
-        <qpid.version>${pom.version}</qpid.version>
-        <qpid.targetDir>${project.build.directory}</qpid.targetDir>
-        <!--<qpid.root>${basedir}/..</qpid.root>-->
-        <sasl.properties>${basedir}/etc/sasl.properties</sasl.properties>
-
-        <!-- This is a dummy value to ensure this property exists, override in your settings.xml to your real 1.4 jdk location to run tests. -->
-        <jvm.1.4.bin>path/to/java1.4</jvm.1.4.bin>
-    </properties>
-
-    <dependencies>
-
-        <!-- These dependencies have to be re-declared here, because exluding the normal (non 1.4) client and common from the distribution takes out
-             these transitive dependencies too. -->
-        <dependency>
-            <groupId>log4j</groupId>
-            <artifactId>log4j</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-simple</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.mina</groupId>
-            <artifactId>mina-core</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.geronimo.specs</groupId>
-            <artifactId>geronimo-jms_1.1_spec</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>commons-collections</groupId>
-            <artifactId>commons-collections</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>commons-lang</groupId>
-            <artifactId>commons-lang</artifactId>
-        </dependency>
-
-        <!-- Use the java 1.4 retrotranslated client. -->
-        <dependency>
-            <groupId>org.apache.qpid</groupId>
-            <artifactId>qpid-client</artifactId>
-            <type>jar</type>
-            <version>${pom.version}</version>
-            <classifier>java14</classifier>
-        </dependency>
-
-        <!-- Use the java 1.4 retrotranslated common library. -->
-        <dependency>
-            <groupId>org.apache.qpid</groupId>
-            <artifactId>qpid-common</artifactId>
-            <type>jar</type>
-            <version>${pom.version}</version>
-            <classifier>java14</classifier>
-        </dependency>
-
-        <!-- Use the java 1.4 retrotranslated integration tests. -->
-        <dependency>
-            <groupId>org.apache.qpid</groupId>
-            <artifactId>qpid-integrationtests</artifactId>
-            <type>jar</type>
-            <version>${pom.version}</version>
-            <classifier>java14</classifier>
-            <!--<scope>test</scope>-->
-        </dependency>
-
-        <dependency>
-            <groupId>net.sf.retrotranslator</groupId>                
-            <artifactId>retrotranslator-runtime</artifactId>
-            <scope>package</scope>
-        </dependency>
-
-        <!-- Test dependencies. -->
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <scope>test</scope>
-        </dependency>
-
-    </dependencies>
-
-    <build>
-        <plugins>
-
-            <!-- Sets up the compiler plugin to compile on 1.4 -->
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-compiler-plugin</artifactId>
-                <configuration>
-                    <source>${java.source.version}</source>
-                    <target>${java.source.version}</target>
-                </configuration>
-            </plugin>
-
-            <!-- Sets up the assembly plugin to use the assembly directions to build a 1.4 compatable client. -->
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-assembly-plugin</artifactId>
-                <version>${assembly.version}</version>
-
-                <executions>
-
-                    <!-- Produces the distribution. -->
-                    <execution>
-                        <id>assembly-dist</id>
-                        <phase>package</phase>
-                        <goals>
-                            <goal>attached</goal>
-                        </goals>
-                        <configuration>
-                            <descriptors>
-                                <descriptor>src/main/assembly/client-java14-bin.xml</descriptor>
-                            </descriptors>
-                            <finalName>qpid-${pom.version}</finalName>
-                            <outputDirectory>${qpid.targetDir}</outputDirectory>
-                            <tarLongFileMode>gnu</tarLongFileMode>
-                        </configuration>
-                    </execution>
-                    
-                    <!-- Produces a jar with all test dependencies in it. For convenience in running tests from command line. -->
-                    <!-- Todo: Replace this with a manifest only jar, its much quicker to build that. -->
-                    <execution>
-                        <id>assembly-alltestdeps</id>
-                        <phase>package</phase>
-                        <goals>
-                            <goal>attached</goal>
-                        </goals>
-                        <configuration>
-                            <descriptors>
-                                <descriptor>src/main/assembly/jar-with-dependencies.xml</descriptor>
-                            </descriptors>
-                            <outputDirectory>target</outputDirectory>
-                            <workDirectory>target/assembly/work</workDirectory>
-                        </configuration>              
-                    </execution>
-
-                </executions>
-            </plugin>
-
-            <!-- Sets up surefire to run during the integration-test phase instead of the test phase. -->
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-surefire-plugin</artifactId>
-                <configuration>
-                    <skip>true</skip>
-                </configuration>
-                <executions>
-                    <execution>
-                        <id>surefire-it</id>
-                        <phase>integration-test</phase>
-                        <goals>
-                            <goal>test</goal>
-                        </goals>
-                        <configuration>
-                            <skip>true</skip>
-                            <forkMode>once</forkMode>
-                            <jvm>${jvm.1.4.bin}</jvm>
-                            <systemProperties>
-                                <property>
-                                    <name>amqj.logging.level</name>
-                                    <value>${amqj.logging.level}</value>
-                                </property>
-                                <property>
-                                    <name>log4j.configuration</name>
-                                    <value>${log4j.configuration}</value>
-                                </property>
-                                <property>
-                                    <name>amq.dynamicsaslregistrar.properties</name>
-                                    <value>${sasl.properties}</value>
-                                </property>
-                            </systemProperties>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-
-        </plugins>
-    </build>
-</project>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements. See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership. The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License. You may obtain a copy of the License at
+    
+    http://www.apache.org/licenses/LICENSE-2.0
+    
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied. See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>org.apache.qpid</groupId>
+    <artifactId>qpid-client-java14</artifactId>
+    <packaging>jar</packaging>
+    <version>1.0-incubating-M2.1-SNAPSHOT</version>
+    <name>Qpid Client for Java 1.4</name>
+    <url>http://cwiki.apache.org/confluence/display/qpid</url>
+
+    <parent>
+        <groupId>org.apache.qpid</groupId>
+        <artifactId>qpid</artifactId>
+        <version>1.0-incubating-M2.1-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <properties>
+        <topDirectoryLocation>..</topDirectoryLocation>
+        <java.source.version>1.4</java.source.version>
+        <qpid.version>${pom.version}</qpid.version>
+        <qpid.targetDir>${project.build.directory}</qpid.targetDir>
+        <!--<qpid.root>${basedir}/..</qpid.root>-->
+        <sasl.properties>${basedir}/etc/sasl.properties</sasl.properties>
+
+        <!-- This is a dummy value to ensure this property exists, override in your settings.xml to your real 1.4 jdk location to run tests. -->
+        <jvm.1.4.bin>path/to/java1.4</jvm.1.4.bin>
+    </properties>
+
+    <dependencies>
+
+        <!-- These dependencies have to be re-declared here, because exluding the normal (non 1.4) client and common from the distribution takes out
+             these transitive dependencies too. -->
+        <dependency>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.mina</groupId>
+            <artifactId>mina-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.geronimo.specs</groupId>
+            <artifactId>geronimo-jms_1.1_spec</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>commons-collections</groupId>
+            <artifactId>commons-collections</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>commons-lang</groupId>
+            <artifactId>commons-lang</artifactId>
+        </dependency>
+
+        <!-- Use the java 1.4 retrotranslated client. -->
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-client</artifactId>
+            <type>jar</type>
+            <version>${pom.version}</version>
+            <classifier>java14</classifier>
+        </dependency>
+
+        <!-- Use the java 1.4 retrotranslated common library. -->
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-common</artifactId>
+            <type>jar</type>
+            <version>${pom.version}</version>
+            <classifier>java14</classifier>
+        </dependency>
+
+        <!-- Use the java 1.4 retrotranslated integration tests. -->
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-integrationtests</artifactId>
+            <type>jar</type>
+            <version>${pom.version}</version>
+            <classifier>java14</classifier>
+            <!--<scope>test</scope>-->
+        </dependency>
+
+        <dependency>
+            <groupId>net.sf.retrotranslator</groupId>                
+            <artifactId>retrotranslator-runtime</artifactId>
+            <scope>package</scope>
+        </dependency>
+
+        <!-- Test dependencies. -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+
+            <!-- Sets up the compiler plugin to compile on 1.4 -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <source>${java.source.version}</source>
+                    <target>${java.source.version}</target>
+                </configuration>
+            </plugin>
+
+            <!-- Sets up the assembly plugin to use the assembly directions to build a 1.4 compatable client. -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>${assembly.version}</version>
+
+                <executions>
+
+                    <!-- Produces the distribution. -->
+                    <execution>
+                        <id>assembly-dist</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>attached</goal>
+                        </goals>
+                        <configuration>
+                            <descriptors>
+                                <descriptor>src/main/assembly/client-java14-bin.xml</descriptor>
+                            </descriptors>
+                            <finalName>qpid-${pom.version}</finalName>
+                            <outputDirectory>${qpid.targetDir}</outputDirectory>
+                            <tarLongFileMode>gnu</tarLongFileMode>
+                        </configuration>
+                    </execution>
+                    
+                    <!-- Produces a jar with all test dependencies in it. For convenience in running tests from command line. -->
+                    <!-- Todo: Replace this with a manifest only jar, its much quicker to build that. -->
+                    <execution>
+                        <id>assembly-alltestdeps</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>attached</goal>
+                        </goals>
+                        <configuration>
+                            <descriptors>
+                                <descriptor>src/main/assembly/jar-with-dependencies.xml</descriptor>
+                            </descriptors>
+                            <outputDirectory>target</outputDirectory>
+                            <workDirectory>target/assembly/work</workDirectory>
+                        </configuration>              
+                    </execution>
+
+                </executions>
+            </plugin>
+
+            <!-- Sets up surefire to run during the integration-test phase instead of the test phase. -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <skip>true</skip>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>surefire-it</id>
+                        <phase>integration-test</phase>
+                        <goals>
+                            <goal>test</goal>
+                        </goals>
+                        <configuration>
+                            <skip>true</skip>
+                            <forkMode>once</forkMode>
+                            <jvm>${jvm.1.4.bin}</jvm>
+                            <systemProperties>
+                                <property>
+                                    <name>amqj.logging.level</name>
+                                    <value>${amqj.logging.level}</value>
+                                </property>
+                                <property>
+                                    <name>log4j.configuration</name>
+                                    <value>${log4j.configuration}</value>
+                                </property>
+                                <property>
+                                    <name>amq.dynamicsaslregistrar.properties</name>
+                                    <value>${sasl.properties}</value>
+                                </property>
+                            </systemProperties>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+
+        </plugins>
+    </build>
+</project>
\ No newline at end of file

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/pom.xml?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/pom.xml (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/pom.xml Thu Feb 28 04:16:41 2008
@@ -79,7 +79,7 @@
         </dependency>
 
         <dependency>
-            <groupId>uk.co.thebadgerset</groupId>
+            <groupId>org.apache.qpid</groupId>
             <artifactId>junit-toolkit</artifactId>
             <scope>test</scope>
         </dependency>

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Thu Feb 28 04:16:41 2008
@@ -73,6 +73,105 @@
 
 public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
 {
+    public static final class ChannelToSessionMap
+    {
+        private final AMQSession[] _fastAccessSessions = new AMQSession[16];
+        private final LinkedHashMap<Integer, AMQSession> _slowAccessSessions = new LinkedHashMap<Integer, AMQSession>();
+        private int _size = 0;
+        private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
+
+        public AMQSession get(int channelId)
+        {
+            if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+            {
+                return _fastAccessSessions[channelId];
+            }
+            else
+            {
+                return _slowAccessSessions.get(channelId);
+            }
+        }
+
+        public AMQSession put(int channelId, AMQSession session)
+        {
+            AMQSession oldVal;
+            if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+            {
+                oldVal = _fastAccessSessions[channelId];
+                _fastAccessSessions[channelId] = session;
+            }
+            else
+            {
+                oldVal = _slowAccessSessions.put(channelId, session);
+            }
+            if((oldVal != null) && (session == null))
+            {
+                _size--;
+            }
+            else if((oldVal == null) && (session != null))
+            {
+                _size++;
+            }
+
+            return session;
+
+        }
+
+
+        public AMQSession remove(int channelId)
+        {
+            AMQSession session;
+            if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+            {
+                 session = _fastAccessSessions[channelId];
+                _fastAccessSessions[channelId] = null;
+            }
+            else
+            {
+                session = _slowAccessSessions.remove(channelId);
+            }
+
+            if(session != null)
+            {
+                _size--;
+            }
+            return session;
+
+        }
+
+        public Collection<AMQSession> values()
+        {
+            ArrayList<AMQSession> values = new ArrayList<AMQSession>(size());
+
+            for(int i = 0; i < 16; i++)
+            {
+                if(_fastAccessSessions[i] != null)
+                {
+                    values.add(_fastAccessSessions[i]);
+                }
+            }
+            values.addAll(_slowAccessSessions.values());
+
+            return values;
+        }
+
+        public int size()
+        {
+            return _size;
+        }
+
+        public void clear()
+        {
+            _size = 0;
+            _slowAccessSessions.clear();
+            for(int i = 0; i<16; i++)
+            {
+                _fastAccessSessions[i] = null;
+            }
+        }
+    }
+
+
     private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class);
 
     protected AtomicInteger _idFactory = new AtomicInteger(0);
@@ -102,7 +201,7 @@
     protected AMQProtocolHandler _protocolHandler;
 
     /** Maps from session id (Integer) to AMQSession instance */
-    private final Map<Integer, AMQSession> _sessions = new LinkedHashMap<Integer, AMQSession>();
+    private final ChannelToSessionMap _sessions = new ChannelToSessionMap();
 
     private String _clientName;
 
@@ -266,6 +365,26 @@
             _delegate = new AMQConnectionDelegate_0_8(this);
         }
 
+        final ArrayList<JMSException> exceptions = new ArrayList<JMSException>();
+
+        class Listener implements ExceptionListener
+        {
+            public void onException(JMSException e)
+            {
+                exceptions.add(e);
+            }
+        }
+
+        try
+        {
+            setExceptionListener(new Listener());
+        }
+        catch (JMSException e)
+        {
+            // Shouldn't happen
+            throw new AMQException(null, null, e);
+        }
+
         if (_logger.isInfoEnabled())
         {
             _logger.info("Connection:" + connectionURL);
@@ -322,8 +441,6 @@
             try
             {
                 makeBrokerConnection(_failoverPolicy.getNextBrokerDetails());
-                lastException = null;
-                _connected = true;
             }
             catch (Exception e)
             {
@@ -351,7 +468,23 @@
         {
             String message = null;
 
-            if (lastException != null)
+            if (exceptions.size() > 0)
+            {
+                JMSException e = exceptions.get(exceptions.size() - 1);
+                int code = -1;
+                try
+                {
+                    code = new Integer(e.getErrorCode()).intValue();
+                }
+                catch (NumberFormatException nfe)
+                {
+                    // Ignore this, we have some error codes and messages swapped around
+                }
+
+                throw new AMQConnectionFailureException(AMQConstant.getConstant(code),
+                                                        e.getMessage(), e);
+            }
+            else if (lastException != null)
             {
                 if (lastException.getCause() != null)
                 {
@@ -671,10 +804,10 @@
         checkNotClosed();
         if (!_started)
         {
-            final Iterator it = _sessions.entrySet().iterator();
+            final Iterator it = _sessions.values().iterator();
             while (it.hasNext())
             {
-                final AMQSession s = (AMQSession) ((Map.Entry) it.next()).getValue();
+                final AMQSession s = (AMQSession) (it.next());
                 try
                 {
                     s.start();
@@ -927,11 +1060,11 @@
         return _maximumFrameSize;
     }
 
-    public Map getSessions()
+    public ChannelToSessionMap getSessions()
     {
         return _sessions;
     }
-
+    
     public String getUsername()
     {
         return _username;
@@ -1149,6 +1282,24 @@
     void deregisterSession(int channelId)
     {
         _sessions.remove(channelId);
+    }
+
+    /**
+     * For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling.
+     * The caller must hold the failover mutex before calling this method.
+     */
+    public void resubscribeSesssions() throws JMSException, AMQException, FailoverException
+    {
+        ArrayList sessions = new ArrayList(_sessions.values());
+        _logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: removeKey?
+        for (Iterator it = sessions.iterator(); it.hasNext();)
+        {
+            AMQSession s = (AMQSession) it.next();
+            // _protocolHandler.addSessionByChannel(s.getChannelId(), s);
+            reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.getTransacted());
+            s.resubscribe();
+            s.setFlowControl(true);
+        }
     }
 
     public String toString()

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java Thu Feb 28 04:16:41 2008
@@ -88,6 +88,7 @@
         checkState();
         final BasicMessageConsumer consumer =
             (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false);
+        consumer.closeWhenNoMessages(true);
         _consumers.add(consumer);
 
         return new Enumeration()

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Thu Feb 28 04:16:41 2008
@@ -23,6 +23,7 @@
 import java.io.Serializable;
 import java.text.MessageFormat;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -109,6 +110,89 @@
  */
 public abstract class AMQSession extends Closeable implements Session, QueueSession, TopicSession
 {
+    public static final class IdToConsumerMap
+    {
+        private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16];
+        private final ConcurrentHashMap<Integer, BasicMessageConsumer> _slowAccessConsumers = new ConcurrentHashMap<Integer, BasicMessageConsumer>();
+
+
+        public BasicMessageConsumer get(int id)
+        {
+            if((id & 0xFFFFFFF0) == 0)
+            {
+                return _fastAccessConsumers[id];
+            }
+            else
+            {
+                return _slowAccessConsumers.get(id);
+            }
+        }
+
+        public BasicMessageConsumer put(int id, BasicMessageConsumer consumer)
+        {
+            BasicMessageConsumer oldVal;
+            if((id & 0xFFFFFFF0) == 0)
+            {
+                oldVal = _fastAccessConsumers[id];
+                _fastAccessConsumers[id] = consumer;
+            }
+            else
+            {
+                oldVal = _slowAccessConsumers.put(id, consumer);
+            }
+
+            return consumer;
+
+        }
+
+
+        public BasicMessageConsumer remove(int id)
+        {
+            BasicMessageConsumer consumer;
+            if((id & 0xFFFFFFF0) == 0)
+            {
+                 consumer = _fastAccessConsumers[id];
+                _fastAccessConsumers[id] = null;
+            }
+            else
+            {
+                consumer = _slowAccessConsumers.remove(id);
+            }
+
+            return consumer;
+
+        }
+
+        public Collection<BasicMessageConsumer> values()
+        {
+            ArrayList<BasicMessageConsumer> values = new ArrayList<BasicMessageConsumer>();
+
+            for(int i = 0; i < 16; i++)
+            {
+                if(_fastAccessConsumers[i] != null)
+                {
+                    values.add(_fastAccessConsumers[i]);
+                }
+            }
+            values.addAll(_slowAccessConsumers.values());
+
+            return values;
+        }
+
+
+        public void clear()
+        {
+            _slowAccessConsumers.clear();
+            for(int i = 0; i<16; i++)
+            {
+                _fastAccessConsumers[i] = null;
+            }
+        }
+    }
+
+
+
+
     /** Used for debugging. */
     private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
 
@@ -158,7 +242,7 @@
     protected boolean _transacted;
 
     /** Holds the sessions acknowledgement mode. */
-    protected int _acknowledgeMode;
+    protected final int _acknowledgeMode;
 
     /** Holds this session unique identifier, used to distinguish it from other sessions. */
     protected int _channelId;
@@ -224,8 +308,10 @@
      * Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right
      * consumer.
      */
-    protected Map<AMQShortString, BasicMessageConsumer> _consumers =
-            new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
+    protected final IdToConsumerMap _consumers = new IdToConsumerMap();
+    
+            //Map<AMQShortString, BasicMessageConsumer> _consumers =
+            //new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
 
     /**
      * Contains a list of consumers which have been removed but which might still have
@@ -288,6 +374,27 @@
     /** Has failover occured on this session */
     private boolean _failedOver;
 
+
+
+    private static final class FlowControlIndicator
+    {
+        private volatile boolean _flowControl = true;
+
+        public synchronized void setFlowControl(boolean flowControl)
+        {
+            _flowControl= flowControl;
+            notify();
+        }
+
+        public boolean getFlowControl()
+        {
+            return _flowControl;
+        }
+    }
+
+    /** Flow control */
+    private FlowControlIndicator _flowControl = new FlowControlIndicator();
+
     /**
      * Creates a new session on a connection.
      *
@@ -334,24 +441,20 @@
                                                      {
                                                          public void aboveThreshold(int currentValue)
                                                          {
-                                                             if (_acknowledgeMode == NO_ACKNOWLEDGE)
-                                                             {
                                                                  _logger.debug(
                                                                          "Above threshold(" + _defaultPrefetchHighMark
                                                                          + ") so suspending channel. Current value is " + currentValue);
                                                                  new Thread(new SuspenderRunner(true)).start();
-                                                             }
+
                                                          }
 
                                                          public void underThreshold(int currentValue)
                                                          {
-                                                             if (_acknowledgeMode == NO_ACKNOWLEDGE)
-                                                             {
                                                                  _logger.debug(
                                                                          "Below threshold(" + _defaultPrefetchLowMark
                                                                          + ") so unsuspending channel. Current value is " + currentValue);
                                                                  new Thread(new SuspenderRunner(false)).start();
-                                                             }
+
                                                          }
                                                      });
         }
@@ -697,7 +800,7 @@
     {
 
         // Remove the consumer from the map
-        BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag);
+        BasicMessageConsumer consumer = _consumers.get(consumerTag.toIntValue());
         if (consumer != null)
         {
             // fixme this isn't right.. needs to check if _queue contains data for this consumer
@@ -787,6 +890,7 @@
                                   false, false);
     }
 
+
     public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
     {
         checkValidDestination(destination);
@@ -804,6 +908,17 @@
                                   messageSelector, null, false, false);
     }
 
+
+    public MessageConsumer createExclusiveConsumer(Destination destination, String messageSelector, boolean noLocal)
+            throws JMSException
+    {
+        checkValidDestination(destination);
+
+        return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, true,
+                                  messageSelector, null, false, false);
+    }
+
+
     public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive,
                                           String selector) throws JMSException
     {
@@ -905,7 +1020,7 @@
     {
         checkNotClosed();
 
-        return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic), topic);
+        return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic,false,false), topic);
     }
 
     public Queue createQueue(String queueName) throws JMSException
@@ -1072,9 +1187,10 @@
         AMQTopic dest = checkValidTopic(topic);
 
         // AMQTopic dest = new AMQTopic(topic.getTopicName());
-        return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
+        return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest));
     }
 
+
     /**
      * Creates a non-durable subscriber with a message selector
      *
@@ -1092,7 +1208,7 @@
         AMQTopic dest = checkValidTopic(topic);
 
         // AMQTopic dest = new AMQTopic(topic.getTopicName());
-        return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal));
+        return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest, messageSelector, noLocal));
     }
 
     public abstract TemporaryQueue createTemporaryQueue() throws JMSException;
@@ -1648,7 +1764,7 @@
      */
     void deregisterConsumer(BasicMessageConsumer consumer)
     {
-        if (_consumers.remove(consumer.getConsumerTag()) != null)
+        if (_consumers.remove(consumer.getConsumerTag().toIntValue()) != null)
         {
             String subscriptionName = _reverseSubscriptionMap.remove(consumer);
             if (subscriptionName != null)
@@ -2047,8 +2163,9 @@
     private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName,
                                   AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector) throws AMQException, FailoverException
     {
+        int tagId = _nextTag++;
         // need to generate a consumer tag on the client so we can exploit the nowait flag
-        AMQShortString tag = new AMQShortString(Integer.toString(_nextTag++));
+        AMQShortString tag = new AMQShortString(Integer.toString(tagId));
 
         FieldTable arguments = FieldTableFactory.newFieldTable();
         if ((messageSelector != null) && !messageSelector.equals(""))
@@ -2068,7 +2185,7 @@
 
         consumer.setConsumerTag(tag);
         // we must register the consumer in the map before we actually start listening
-        _consumers.put(tag, consumer);
+        _consumers.put(tagId, consumer);
 
         try
         {
@@ -2096,7 +2213,7 @@
         catch (AMQException e)
         {
             // clean-up the map in the event of an error
-            _consumers.remove(tag);
+            _consumers.remove(tagId);
             throw e;
         }
     }
@@ -2670,6 +2787,25 @@
         _ticket = ticket;
     }
 
+    public void setFlowControl(final boolean active)
+    {
+        _flowControl.setFlowControl(active);
+    }
+
+
+    public void checkFlowControl() throws InterruptedException
+    {
+        synchronized(_flowControl)
+        {
+            while(!_flowControl.getFlowControl())
+            {
+                _flowControl.wait();
+            }
+        }
+
+    }
+
+
     /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
     private class Dispatcher extends Thread
     {
@@ -2874,38 +3010,38 @@
             //This if block is not needed anymore as bounce messages are handled separately
             //if (message.getDeliverBody() != null)
             //{
-                final BasicMessageConsumer consumer =
-                    (BasicMessageConsumer) _consumers.get(new AMQShortString(message.getConsumerTag()));
+            final BasicMessageConsumer consumer =
+                _consumers.get(message.getConsumerTag().toIntValue());
 
-                if ((consumer == null) || consumer.isClosed())
+            if ((consumer == null) || consumer.isClosed())
+            {
+                if (_dispatcherLogger.isInfoEnabled())
                 {
-                    if (_dispatcherLogger.isInfoEnabled())
+                    if (consumer == null)
                     {
-                        if (consumer == null)
-                        {
-                            _dispatcherLogger.info("Dispatcher(" + dispatcherID + ")Received a message(" + System.identityHashCode(message) + ")" + "["
+                        _dispatcherLogger.info("Dispatcher(" + dispatcherID + ")Received a message(" + System.identityHashCode(message) + ")" + "["
                                 + message.getDeliveryTag() + "] from queue "
                                 + message.getConsumerTag() + " )without a handler - rejecting(requeue)...");
-                        }
-                        else
-                        {
-                            _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "["
-                                + message.getDeliveryTag() + "] from queue " + " consumer("
-                                + consumer.debugIdentity() + ") is closed rejecting(requeue)...");
-                        }
                     }
-                    // Don't reject if we're already closing
-                    if (!_closed.get())
+                    else
                     {
-                        rejectMessage(message, true);
+                        _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "["
+                                + message.getDeliveryTag() + "] from queue " + " consumer("
+                                + message.getConsumerTag() + ") is closed rejecting(requeue)...");
                     }
                 }
-                else
+                // Don't reject if we're already closing
+                if (!_closed.get())
                 {
-                    consumer.notifyMessage(message, _channelId);
+                    rejectMessage(message, true);
                 }
             }
-        //}
+            else
+            {
+                consumer.notifyMessage(message);
+            }
+
+        }
     }
 
     /*public void requestAccess(AMQShortString realm, boolean exclusive, boolean passive, boolean active, boolean write,

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Thu Feb 28 04:16:41 2008
@@ -222,7 +222,7 @@
 
         consumer.setConsumerTag(tag);
         // we must register the consumer in the map before we actually start listening
-        _consumers.put(tag, consumer);
+        _consumers.put(tag.toIntValue(), consumer);
         // TODO: Be aware of possible changes to parameter order as versions change.
         AMQFrame jmsConsume = getProtocolHandler().getMethodRegistry().createBasicConsumeBody(getTicket(),
                 queueName,