You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2012/03/10 20:22:37 UTC

svn commit: r1299257 [7/26] - in /qpid/branches/rg-amqp-1-0-sandbox/qpid/java: broker-plugins/ broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/ broker-plugins/access-control/src/main/java/org/apache/qpid/server...

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java Sat Mar 10 19:22:10 2012
@@ -20,12 +20,20 @@
  */
 package org.apache.qpid.server.protocol;
 
+import java.util.concurrent.ConcurrentSkipListSet;
+
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.message.InboundMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.SimpleAMQQueue;
 
-public interface AMQSessionModel
+/**
+ * Session model interface.
+ * Extends {@link Comparable} to allow objects to be inserted into a {@link ConcurrentSkipListSet}
+ * when monitoring the blocking and blocking of queues/sessions in {@link SimpleAMQQueue}.
+ */
+public interface AMQSessionModel extends Comparable<AMQSessionModel>
 {
     public Object getID();
 
@@ -57,4 +65,7 @@ public interface AMQSessionModel
     void block(AMQQueue queue);
 
     void unblock(AMQQueue queue);
+
+
+    boolean onSameConnection(InboundMessage inbound);
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java Sat Mar 10 19:22:10 2012
@@ -21,6 +21,11 @@
 package org.apache.qpid.server.protocol;
 
 
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.Set;
+
 import org.apache.log4j.Logger;
 import org.apache.qpid.protocol.ServerProtocolEngine;
 import org.apache.qpid.server.registry.IApplicationRegistry;
@@ -29,10 +34,6 @@ import org.apache.qpid.transport.Connect
 import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.network.NetworkConnection;
 
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.util.Set;
-
 public class MultiVersionProtocolEngine implements ServerProtocolEngine
 {
     private static final Logger _logger = Logger.getLogger(MultiVersionProtocolEngine.class);
@@ -44,29 +45,35 @@ public class MultiVersionProtocolEngine 
     private IApplicationRegistry _appRegistry;
     private NetworkConnection _network;
     private Sender<ByteBuffer> _sender;
+    private final AmqpProtocolVersion _defaultSupportedReply;
 
     private volatile ServerProtocolEngine _delegate = new SelfDelegateProtocolEngine();
 
-    public MultiVersionProtocolEngine(IApplicationRegistry appRegistry,
-                                      String fqdn,
-                                      Set<AmqpProtocolVersion> supported,
-                                      NetworkConnection network,
-                                      long id)
+    public MultiVersionProtocolEngine(final IApplicationRegistry appRegistry,
+                                      final Set<AmqpProtocolVersion> supported,
+                                      final AmqpProtocolVersion defaultSupportedReply,
+                                      final long id,
+                                      final NetworkConnection network)
     {
-        this(appRegistry,fqdn,supported,id);
+        this(appRegistry, supported, defaultSupportedReply, id);
         setNetworkConnection(network);
     }
 
-    public MultiVersionProtocolEngine(IApplicationRegistry appRegistry,
-                                      String fqdn,
-                                      Set<AmqpProtocolVersion> supported,
-                                      long id)
+    public MultiVersionProtocolEngine(final IApplicationRegistry appRegistry,
+                                      final Set<AmqpProtocolVersion> supported,
+                                      final AmqpProtocolVersion defaultSupportedReply,
+                                      final long id)
     {
+        if(defaultSupportedReply != null && !supported.contains(defaultSupportedReply))
+        {
+            throw new IllegalArgumentException("The configured default reply (" + defaultSupportedReply
+                                             + ") to an unsupported protocol version initiation is itself not supported!");
+        }
+
         _id = id;
         _appRegistry = appRegistry;
-        _fqdn = fqdn;
         _supported = supported;
-
+        _defaultSupportedReply = defaultSupportedReply;
     }
 
 
@@ -198,6 +205,15 @@ public class MultiVersionProtocolEngine 
     public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
     {
         _network = network;
+        SocketAddress address = _network.getLocalAddress();
+        if (address instanceof InetSocketAddress)
+        {
+            _fqdn = ((InetSocketAddress) address).getHostName();
+        }
+        else
+        {
+            throw new IllegalArgumentException("Unsupported socket address class: " + address);
+        }
         _sender = sender;
     }
 
@@ -445,14 +461,18 @@ public class MultiVersionProtocolEngine 
 
 
                 ServerProtocolEngine newDelegate = null;
-                byte[] newestSupported = null;
+                byte[] supportedReplyBytes = null;
+                byte[] defaultSupportedReplyBytes = null;
+                AmqpProtocolVersion supportedReplyVersion = null;
 
+                //Check the supported versions for a header match, and if there is one save the
+                //delegate. Also save most recent supported version and associated reply header bytes
                 for(int i = 0; newDelegate == null && i < _creators.length; i++)
                 {
-
                     if(_supported.contains(_creators[i].getVersion()))
                     {
-                        newestSupported = _creators[i].getHeaderIdentifier();
+                        supportedReplyBytes = _creators[i].getHeaderIdentifier();
+                        supportedReplyVersion = _creators[i].getVersion();
                         byte[] compareBytes = _creators[i].getHeaderIdentifier();
                         boolean equal = true;
                         for(int j = 0; equal && j<compareBytes.length; j++)
@@ -464,12 +484,35 @@ public class MultiVersionProtocolEngine 
                             newDelegate = _creators[i].getProtocolEngine();
                         }
                     }
+
+                    //If there is a configured default reply to an unsupported version initiation,
+                    //then save the associated reply header bytes when we encounter them
+                    if(_defaultSupportedReply != null && _creators[i].getVersion() == _defaultSupportedReply)
+                    {
+                        defaultSupportedReplyBytes = _creators[i].getHeaderIdentifier();
+                    }
                 }
 
-                // If no delegate is found then send back the most recent support protocol version id
+                // If no delegate is found then send back a supported protocol version id
                 if(newDelegate == null)
                 {
-                    _sender.send(ByteBuffer.wrap(newestSupported));
+                    //if a default reply was specified use its reply header instead of the most recent supported version
+                    if(_defaultSupportedReply != null && !(_defaultSupportedReply == supportedReplyVersion))
+                    {
+                        if(_logger.isDebugEnabled())
+                        {
+                            _logger.debug("Default reply to unsupported protocol version was configured, changing reply from "
+                                    + supportedReplyVersion + " to " + _defaultSupportedReply);
+                        }
+
+                        supportedReplyBytes = defaultSupportedReplyBytes;
+                        supportedReplyVersion = _defaultSupportedReply;
+                    }
+                    if(_logger.isDebugEnabled())
+                    {
+                        _logger.debug("Unsupported protocol version requested, replying with: " + supportedReplyVersion);
+                    }
+                    _sender.send(ByteBuffer.wrap(supportedReplyBytes));
                     _sender.flush();
 
                     _delegate = new ClosedDelegateProtocolEngine();
@@ -482,7 +525,6 @@ public class MultiVersionProtocolEngine 
                     _delegate = newDelegate;
 
                     _header.flip();
-                    _delegate.setNetworkConnection(_network, _sender);
                     _delegate.received(_header);
                     if(msg.hasRemaining())
                     {

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java Sat Mar 10 19:22:10 2012
@@ -20,38 +20,38 @@
 */
 package org.apache.qpid.server.protocol;
 
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.qpid.protocol.ProtocolEngineFactory;
 import org.apache.qpid.protocol.ServerProtocolEngine;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.transport.network.NetworkConnection;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory
 {
     private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
 
     private final IApplicationRegistry _appRegistry;
-    private final String _fqdn;
     private final Set<AmqpProtocolVersion> _supported;
+    private final AmqpProtocolVersion _defaultSupportedReply;
 
-    public MultiVersionProtocolEngineFactory(String fqdn, Set<AmqpProtocolVersion> supportedVersions)
+    public MultiVersionProtocolEngineFactory(final Set<AmqpProtocolVersion> supportedVersions, final AmqpProtocolVersion defaultSupportedReply)
     {
+        if(defaultSupportedReply != null && !supportedVersions.contains(defaultSupportedReply))
+        {
+            throw new IllegalArgumentException("The configured default reply (" + defaultSupportedReply
+                                             + ") to an unsupported protocol version initiation is itself not supported!");
+        }
+
         _appRegistry = ApplicationRegistry.getInstance();
-        _fqdn = fqdn;
         _supported = supportedVersions;
-    }
-
-    public ServerProtocolEngine newProtocolEngine(NetworkConnection network)
-    {
-        return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, network, ID_GENERATOR.getAndIncrement());
+        _defaultSupportedReply = defaultSupportedReply;
     }
 
     public ServerProtocolEngine newProtocolEngine()
     {
-        return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, ID_GENERATOR.getAndIncrement());
+        return new MultiVersionProtocolEngine(_appRegistry, _supported, _defaultSupportedReply, ID_GENERATOR.getAndIncrement());
     }
 
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java Sat Mar 10 19:22:10 2012
@@ -21,15 +21,19 @@
 package org.apache.qpid.server.protocol;
 
 import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.configuration.ConfigStore;
+import org.apache.qpid.server.configuration.ConfiguredObject;
+import org.apache.qpid.server.configuration.ConnectionConfig;
+import org.apache.qpid.server.configuration.ConnectionConfigType;
+import org.apache.qpid.server.configuration.VirtualHostConfig;
+import org.apache.qpid.server.logging.messages.ConnectionMessages;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.transport.ServerConnection;
 import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.network.InputHandler;
 import org.apache.qpid.transport.network.Assembler;
 import org.apache.qpid.transport.network.Disassembler;
+import org.apache.qpid.transport.network.InputHandler;
 import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.server.configuration.*;
-import org.apache.qpid.server.transport.ServerConnection;
-import org.apache.qpid.server.logging.messages.ConnectionMessages;
-import org.apache.qpid.server.registry.IApplicationRegistry;
 
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
@@ -86,8 +90,8 @@ public class ProtocolEngine_0_10  extend
         _connection.setSender(new Disassembler(sender, MAX_FRAME_SIZE));
 
         // FIXME Two log messages to maintain compatibility with earlier protocol versions
-        _connection.getLogActor().message(ConnectionMessages.OPEN(null, null, false, false));
-        _connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", false, true));
+        _connection.getLogActor().message(ConnectionMessages.OPEN(null, null, null, false, false, false));
+        _connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", null, false, true, false));
     }
 
     public SocketAddress getRemoteAddress()

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Sat Mar 10 19:22:10 2012
@@ -29,7 +29,6 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQInternalException;
-import org.apache.qpid.AMQInvalidArgumentException;
 import org.apache.qpid.AMQSecurityException;
 import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
 import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
@@ -54,6 +53,7 @@ import org.apache.qpid.amqp_1_0.type.tra
 import org.apache.qpid.amqp_1_0.type.transport.Detach;
 import org.apache.qpid.amqp_1_0.type.transport.Error;
 import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+import org.apache.qpid.filter.selector.ParseException;
 import org.apache.qpid.server.exchange.DirectExchange;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.exchange.TopicExchange;
@@ -135,7 +135,7 @@ public class SendingLink_1_0 implements 
 
                             actualFilters.put(entry.getKey(), entry.getValue());
                         }
-                        catch (AMQInvalidArgumentException e)
+                        catch (ParseException e)
                         {
                             Error error = new Error();
                             error.setCondition(AmqpError.INVALID_FIELD);

Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Mar 10 19:22:10 2012
@@ -4,4 +4,4 @@
 /qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:805429-821809
 /qpid/branches/jmx_mc_gsoc09/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:787599
 /qpid/branches/qpid-2935/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:1061302-1072333
-/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:753219-753220,753253,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1073294-1157765,1160415-1162726,1162729-1166086,1166089-1235000
+/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:753219-753220,753253,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1073294-1299226

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java Sat Mar 10 19:22:10 2012
@@ -20,9 +20,10 @@
 */
 package org.apache.qpid.server.queue;
 
-import java.util.Map;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
+import java.util.Map;
+
 public class AMQPriorityQueue extends OutOfOrderQueue
 {
     protected AMQPriorityQueue(final String name,
@@ -39,6 +40,6 @@ public class AMQPriorityQueue extends Ou
 
     public int getPriorities()
     {
-        return ((PriorityQueueList) _entries).getPriorities();
+        return ((PriorityQueueList) getEntries()).getPriorities();
     }
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Sat Mar 10 19:22:10 2012
@@ -22,20 +22,18 @@ package org.apache.qpid.server.queue;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.binding.Binding;
 import org.apache.qpid.server.configuration.QueueConfig;
+import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.exchange.ExchangeReferrer;
+import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.management.Managable;
 import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.security.AuthorizationHolder;
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.List;
@@ -142,10 +140,9 @@ public interface AMQQueue extends Managa
     public List<QueueEntry> getMessagesRangeOnTheQueue(final long fromPosition, final long toPosition);
 
 
-    void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName,
-                                                        ServerTransaction transaction);
+    void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName);
 
-    void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, ServerTransaction transaction);
+    void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName);
 
     void removeMessagesFromQueue(long fromMessageId, long toMessageId);
 

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java Sat Mar 10 19:22:10 2012
@@ -20,8 +20,6 @@
  */
 package org.apache.qpid.server.queue;
 
-import java.util.HashMap;
-import java.util.Map;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQSecurityException;
 import org.apache.qpid.exchange.ExchangeDefaults;
@@ -35,6 +33,9 @@ import org.apache.qpid.server.exchange.E
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
+import java.util.HashMap;
+import java.util.Map;
+
 public class AMQQueueFactory
 {
     public static final String X_QPID_PRIORITIES = "x-qpid-priorities";
@@ -48,6 +49,10 @@ public class AMQQueueFactory
     public static final String X_QPID_MAXIMUM_DELIVERY_COUNT = "x-qpid-maximum-delivery-count";
     public static final String DEFAULT_DLQ_NAME_SUFFIX = "_DLQ";
 
+    private AMQQueueFactory()
+    {
+    }
+
     private abstract static class QueueProperty
     {
 

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Sat Mar 10 19:22:10 2012
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.queue;
 
+import org.apache.commons.lang.time.FastDateFormat;
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.AMQException;
@@ -31,12 +32,10 @@ import org.apache.qpid.management.common
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.management.AMQManagedObject;
 import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.MessageTransferMessage;
-import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.server.txn.LocalTransaction;
+import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.transport.MessageProperties;
 
 import javax.management.JMException;
@@ -56,9 +55,10 @@ import javax.management.openmbean.Simple
 import javax.management.openmbean.TabularData;
 import javax.management.openmbean.TabularDataSupport;
 import javax.management.openmbean.TabularType;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
 
-import java.text.SimpleDateFormat;
-import java.util.*;
 
 /**
  * AMQQueueMBean is the management bean for an {@link AMQQueue}.
@@ -72,11 +72,13 @@ public class AMQQueueMBean extends AMQMa
 {
 
     /** Used for debugging purposes. */
-    private static final Logger _logger = Logger.getLogger(AMQQueueMBean.class);
+    private static final Logger LOGGER = Logger.getLogger(AMQQueueMBean.class);
 
     /** Date/time format used for message expiration and message timestamp formatting */
     public static final String JMSTIMESTAMP_DATETIME_FORMAT = "MM-dd-yy HH:mm:ss.SSS z";
 
+    private static final FastDateFormat FAST_DATE_FORMAT = FastDateFormat.getInstance(JMSTIMESTAMP_DATETIME_FORMAT);
+
     private final AMQQueue _queue;
     private final String _queueName;
     // OpenMBean data types for viewMessages method
@@ -347,14 +349,14 @@ public class AMQQueueMBean extends AMQMa
     public void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg)
     {
         // important : add log to the log file - monitoring tools may be looking for this
-        _logger.info(notification.name() + " On Queue " + queue.getNameShortString() + " - " + notificationMsg);
+        LOGGER.info(notification.name() + " On Queue " + queue.getNameShortString() + " - " + notificationMsg);
         notificationMsg = notification.name() + " " + notificationMsg;
 
         _lastNotification =
-            new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, ++_notificationSequenceNumber,
+            new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, incrementAndGetSequenceNumber(),
                 System.currentTimeMillis(), notificationMsg);
 
-        _broadcaster.sendNotification(_lastNotification);
+        getBroadcaster().sendNotification(_lastNotification);
     }
 
     public Notification getLastNotification()
@@ -491,7 +493,7 @@ public class AMQQueueMBean extends AMQMa
                     ContentHeaderBody headerBody = msg.getContentHeaderBody();
                     // Create header attributes list
                     headerAttributes = getMessageHeaderProperties(headerBody);
-                    itemValues = new Object[]{msg.getMessageId(), headerAttributes, headerBody.bodySize, queueEntry.isRedelivered(), position, queueEntry.getDeliveryCount()};
+                    itemValues = new Object[]{msg.getMessageId(), headerAttributes, headerBody.getBodySize(), queueEntry.isRedelivered(), position, queueEntry.getDeliveryCount()};
                 }
                 else if(serverMsg instanceof MessageTransferMessage)
                 {
@@ -589,18 +591,8 @@ public class AMQQueueMBean extends AMQMa
     private void addStringifiedJMSTimestamoAndJMSExpiration(final List<String> list,
             final long expirationDate, final long timestampDate)
     {
-        final SimpleDateFormat dateFormat;
-        if (expirationDate != 0 || timestampDate != 0)
-        {
-            dateFormat = new SimpleDateFormat(JMSTIMESTAMP_DATETIME_FORMAT);
-        }
-        else
-        {
-            dateFormat = null;
-        }
-
-        final String formattedExpirationDate = (expirationDate != 0) ? dateFormat.format(new Date(expirationDate)) : null;
-        final String formattedTimestampDate = (timestampDate != 0) ? dateFormat.format(new Date(timestampDate)) : null;
+        final String formattedExpirationDate = (expirationDate != 0) ? FAST_DATE_FORMAT.format(expirationDate) : null;
+        final String formattedTimestampDate = (timestampDate != 0) ? FAST_DATE_FORMAT.format(timestampDate) : null;
         list.add("JMSExpiration = " + formattedExpirationDate);
         list.add("JMSTimestamp = " + formattedTimestampDate);
     }
@@ -619,9 +611,7 @@ public class AMQQueueMBean extends AMQMa
             throw new OperationsException("\"From MessageId\" should be greater than 0 and less than \"To MessageId\"");
         }
 
-        ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getMessageStore());
-        _queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, txn);
-        txn.commit();
+        _queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName);
     }
 
     /**
@@ -654,13 +644,7 @@ public class AMQQueueMBean extends AMQMa
             throw new OperationsException("\"From MessageId\" should be greater than 0 and less than \"To MessageId\"");
         }
 
-        ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getMessageStore());
-
-        _queue.copyMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, txn);
-
-        txn.commit();
-
-
+        _queue.copyMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName);
     }
 
     /**

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java Sat Mar 10 19:22:10 2012
@@ -21,10 +21,10 @@
 
 package org.apache.qpid.server.queue;
 
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.store.TransactionLogResource;
 
 public interface BaseQueue extends TransactionLogResource
 {

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java Sat Mar 10 19:22:10 2012
@@ -41,7 +41,7 @@ public class ConflationQueue extends Sim
 
     public String getConflationKey()
     {
-        return ((ConflationQueueList) _entries).getConflationKey();
+        return ((ConflationQueueList) getEntries()).getConflationKey();
     }
 
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java Sat Mar 10 19:22:10 2012
@@ -21,13 +21,13 @@
 
 package org.apache.qpid.server.queue;
 
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+
 public class ConflationQueueList extends SimpleQueueEntryList
 {
 

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java Sat Mar 10 19:22:10 2012
@@ -20,8 +20,6 @@
 */
 package org.apache.qpid.server.queue;
 
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.AMQException;
 import org.apache.qpid.server.message.AMQMessageHeader;
 
 public interface Filterable

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java Sat Mar 10 19:22:10 2012
@@ -21,9 +21,9 @@
 
 package org.apache.qpid.server.queue;
 
-import org.apache.qpid.server.message.InboundMessage;
-import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.InboundMessage;
 
 public class InboundMessageAdapter implements InboundMessage
 {

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java Sat Mar 10 19:22:10 2012
@@ -20,25 +20,26 @@
  */
 package org.apache.qpid.server.queue;
 
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.message.InboundMessage;
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.message.InboundMessage;
 import org.apache.qpid.server.message.MessageContentSource;
 import org.apache.qpid.server.message.MessageMetaData;
-import org.apache.qpid.AMQException;
-import org.apache.log4j.Logger;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.StoredMessage;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
-import java.nio.ByteBuffer;
 
 public class IncomingMessage implements Filterable, InboundMessage, EnqueableMessage, MessageContentSource
 {
@@ -69,8 +70,6 @@ public class IncomingMessage implements 
 
     private Exchange _exchange;
 
-
-    private int _receivedChunkCount = 0;
     private List<ContentChunk> _contentChunks = new ArrayList<ContentChunk>();
 
     // we keep both the original meta data object and the store reference to it just in case the
@@ -79,13 +78,20 @@ public class IncomingMessage implements 
     private MessageMetaData _messageMetaData;
 
     private StoredMessage<MessageMetaData> _storedMessageHandle;
+    private Object _connectionReference;
 
 
     public IncomingMessage(
             final MessagePublishInfo info
     )
     {
+        this(info, null);
+    }
+
+    public IncomingMessage(MessagePublishInfo info, Object reference)
+    {
         _messagePublishInfo = info;
+        _connectionReference = reference;
     }
 
     public void setContentHeaderBody(final ContentHeaderBody contentHeaderBody) throws AMQException
@@ -124,12 +130,6 @@ public class IncomingMessage implements 
 
     }
 
-    public MessageMetaData headersReceived()
-    {
-
-        return headersReceived(System.currentTimeMillis());
-    }
-
     public MessageMetaData headersReceived(long currentTime)
     {
         _messageMetaData = new MessageMetaData(_messagePublishInfo, _contentHeaderBody, 0, currentTime);
@@ -142,21 +142,15 @@ public class IncomingMessage implements 
         return _destinationQueues;
     }
 
-    public int addContentBodyFrame(final ContentChunk contentChunk)
-            throws AMQException
+    public void addContentBodyFrame(final ContentChunk contentChunk) throws AMQException
     {
-        _storedMessageHandle.addContent((int)_bodyLengthReceived, ByteBuffer.wrap(contentChunk.getData()));
         _bodyLengthReceived += contentChunk.getSize();
         _contentChunks.add(contentChunk);
-
-
-
-        return _receivedChunkCount++;
     }
 
     public boolean allContentReceived()
     {
-        return (_bodyLengthReceived == getContentHeader().bodySize);
+        return (_bodyLengthReceived == getContentHeader().getBodySize());
     }
 
     public AMQShortString getExchange()
@@ -217,7 +211,7 @@ public class IncomingMessage implements 
 
     public long getSize()
     {
-        return getContentHeader().bodySize;
+        return getContentHeader().getBodySize();
     }
 
     public long getMessageNumber()
@@ -251,18 +245,12 @@ public class IncomingMessage implements 
         return _expiration;
     }
 
-    public int getReceivedChunkCount()
-    {
-        return _receivedChunkCount;
-    }
-
-
     public int getBodyCount() throws AMQException
     {
         return _contentChunks.size();
     }
 
-    public ContentChunk getContentChunk(int index) throws IllegalArgumentException, AMQException
+    public ContentChunk getContentChunk(int index)
     {
         return _contentChunks.get(index);
     }
@@ -317,4 +305,14 @@ public class IncomingMessage implements 
     {
         return _storedMessageHandle;
     }
+
+    public Object getConnectionReference()
+    {
+        return _connectionReference;
+    }
+
+    public MessageMetaData getMessageMetaData()
+    {
+        return _messageMetaData;
+    }
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java Sat Mar 10 19:22:10 2012
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.server.queue;
 
-import org.apache.qpid.AMQException;
 import org.apache.qpid.server.message.ServerMessage;
 
 public enum NotificationCheck

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java Sat Mar 10 19:22:10 2012
@@ -1,11 +1,11 @@
 package org.apache.qpid.server.queue;
 
-import java.util.Map;
-
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.subscription.SubscriptionList;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
+import java.util.Map;
+
 public abstract class OutOfOrderQueue extends SimpleAMQQueue
 {
 
@@ -20,7 +20,7 @@ public abstract class OutOfOrderQueue ex
     protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry)
     {
         // check that all subscriptions are not in advance of the entry
-        SubscriptionList.SubscriptionNodeIterator subIter = _subscriptionList.iterator();
+        SubscriptionList.SubscriptionNodeIterator subIter = getSubscriptionList().iterator();
         while(subIter.advance() && !entry.isAcquired())
         {
             final Subscription subscription = subIter.getNode().getSubscription();
@@ -29,7 +29,7 @@ public abstract class OutOfOrderQueue ex
                 QueueContext context = (QueueContext) subscription.getQueueContext();
                 if(context != null)
                 {
-                    QueueEntry released = context._releasedEntry;
+                    QueueEntry released = context.getReleasedEntry();
                     while(!entry.isAcquired() && (released == null || released.compareTo(entry) > 0))
                     {
                         if(QueueContext._releasedUpdater.compareAndSet(context,released,entry))
@@ -38,7 +38,7 @@ public abstract class OutOfOrderQueue ex
                         }
                         else
                         {
-                            released = context._releasedEntry;
+                            released = context.getReleasedEntry();
                         }
                     }
                 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java Sat Mar 10 19:22:10 2012
@@ -74,7 +74,7 @@ public class PriorityQueueList implement
         {
             final QueueEntryList<?> nodeEntryList = node.getQueueEntryList();
             int index;
-            for(index = _priorityLists.length-1; _priorityLists[index] != nodeEntryList; index--);
+            for(index = _priorityLists.length-1; _priorityLists[index] != nodeEntryList; index--) {};
 
             while(next == null && index != 0)
             {

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueContext.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueContext.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueContext.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueContext.java Sat Mar 10 19:22:10 2012
@@ -25,8 +25,8 @@ import java.util.concurrent.atomic.Atomi
 
 final class QueueContext implements AMQQueue.Context
 {
-    volatile QueueEntry _lastSeenEntry;
-    volatile QueueEntry _releasedEntry;
+    private volatile QueueEntry _lastSeenEntry;
+    private volatile QueueEntry _releasedEntry;
 
     static final AtomicReferenceFieldUpdater<QueueContext, QueueEntry>
             _lastSeenUpdater =
@@ -46,4 +46,10 @@ final class QueueContext implements AMQQ
     {
         return _lastSeenEntry;
     }
+
+
+    QueueEntry getReleasedEntry()
+    {
+        return _releasedEntry;
+    }
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Sat Mar 10 19:22:10 2012
@@ -1,10 +1,8 @@
 package org.apache.qpid.server.queue;
 
-import java.util.Collection;
-
 import org.apache.qpid.AMQException;
-import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.subscription.Subscription;
 
 /*
 *

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Sat Mar 10 19:22:10 2012
@@ -34,7 +34,6 @@ import org.apache.qpid.server.message.AM
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 
@@ -416,11 +415,19 @@ public abstract class QueueEntryImpl imp
 
         if (alternateExchange != null)
         {
-            final List<? extends BaseQueue> rerouteQueues = alternateExchange.route(new InboundMessageAdapter(this));
+            InboundMessageAdapter inboundMessageAdapter = new InboundMessageAdapter(this);
+            List<? extends BaseQueue> queues = alternateExchange.route(inboundMessageAdapter);
             final ServerMessage message = getMessage();
-            if (rerouteQueues != null && rerouteQueues.size() != 0)
+            if ((queues == null || queues.size() == 0) && alternateExchange.getAlternateExchange() != null)
             {
+                queues = alternateExchange.getAlternateExchange().route(inboundMessageAdapter);
+            }
+
 
+
+            if (queues != null && queues.size() != 0)
+            {
+                final List<? extends BaseQueue> rerouteQueues = queues;
                 ServerTransaction txn = new LocalTransaction(getQueue().getVirtualHost().getMessageStore());
 
                 txn.enqueue(rerouteQueues, message, new ServerTransaction.Action()

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java Sat Mar 10 19:22:10 2012
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.server.queue;
 
-import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java Sat Mar 10 19:22:10 2012
@@ -20,17 +20,16 @@
  */
 package org.apache.qpid.server.queue;
 
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.queue.QueueRunner;
-import org.apache.qpid.server.queue.SimpleAMQQueue;
-
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.transport.TransportException;
+
 /**
  * QueueRunners are Runnables used to process a queue when requiring
  * asynchronous message delivery to subscriptions, which is necessary
@@ -47,7 +46,6 @@ public class QueueRunner implements Runn
     private static int SCHEDULED = 1;
     private static int RUNNING = 2;
 
-
     private final AtomicInteger _scheduled = new AtomicInteger(IDLE);
 
     private final AtomicBoolean _stateChange = new AtomicBoolean();
@@ -55,8 +53,6 @@ public class QueueRunner implements Runn
     private final AtomicLong _lastRunAgain = new AtomicLong();
     private final AtomicLong _lastRunTime = new AtomicLong();
 
-    private long _continues;
-
     public QueueRunner(SimpleAMQQueue queue)
     {
         _queue = queue;
@@ -74,24 +70,35 @@ public class QueueRunner implements Runn
 
                 runAgain = _queue.processQueue(this);
             }
-            catch (AMQException e)
+            catch (final AMQException e)
             {
                 _logger.error("Exception during asynchronous delivery by " + toString(), e);
             }
-            finally
+            catch (final TransportException transe)
             {
-                CurrentActor.remove();
+                final String errorMessage = "Problem during asynchronous delivery by " + toString();
+                if(_logger.isDebugEnabled())
+                {
+                    _logger.debug(errorMessage, transe);
+                }
+                else
+                {
+                    _logger.info(errorMessage + ' ' + transe.getMessage());
+                }
             }
-            _scheduled.compareAndSet(RUNNING, IDLE);
-            long stateChangeCount = _queue.getStateChangeCount();
-            _lastRunAgain.set(runAgain);
-            _lastRunTime.set(System.nanoTime());
-            if(runAgain == 0L || runAgain != stateChangeCount || _stateChange.compareAndSet(true,false))
+            finally
             {
-                _continues++;
-                if(_scheduled.compareAndSet(IDLE, SCHEDULED))
+                CurrentActor.remove();
+                _scheduled.compareAndSet(RUNNING, IDLE);
+                final long stateChangeCount = _queue.getStateChangeCount();
+                _lastRunAgain.set(runAgain);
+                _lastRunTime.set(System.nanoTime());
+                if(runAgain == 0L || runAgain != stateChangeCount || _stateChange.compareAndSet(true,false))
                 {
-                    _queue.execute(this);
+                    if(_scheduled.compareAndSet(IDLE, SCHEDULED))
+                    {
+                        _queue.execute(this);
+                    }
                 }
             }
 

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Sat Mar 10 19:22:10 2012
@@ -27,12 +27,16 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.management.JMException;
+
+import javax.management.JMException;
+
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQSecurityException;
@@ -99,13 +103,10 @@ public class SimpleAMQQueue implements A
 
     private Exchange _alternateExchange;
 
-    /** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */
 
+    private final QueueEntryList<QueueEntry> _entries;
 
-
-    protected final QueueEntryList _entries;
-
-    protected final SubscriptionList _subscriptionList = new SubscriptionList();
+    private final SubscriptionList _subscriptionList = new SubscriptionList();
 
     private volatile Subscription _exclusiveSubscriber;
 
@@ -137,19 +138,19 @@ public class SimpleAMQQueue implements A
     private final AtomicInteger _bindingCountHigh = new AtomicInteger();
 
     /** max allowed size(KB) of a single message */
-    public long _maximumMessageSize = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageSize();
+    private long _maximumMessageSize = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageSize();
 
     /** max allowed number of messages on a queue. */
-    public long _maximumMessageCount = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageCount();
+    private long _maximumMessageCount = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageCount();
 
     /** max queue depth for the queue */
-    public long _maximumQueueDepth = ApplicationRegistry.getInstance().getConfiguration().getMaximumQueueDepth();
+    private long _maximumQueueDepth = ApplicationRegistry.getInstance().getConfiguration().getMaximumQueueDepth();
 
     /** maximum message age before alerts occur */
-    public long _maximumMessageAge = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageAge();
+    private long _maximumMessageAge = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageAge();
 
     /** the minimum interval between sending out consecutive alerts of the same type */
-    public long _minimumAlertRepeatGap = ApplicationRegistry.getInstance().getConfiguration().getMinimumAlertRepeatGap();
+    private long _minimumAlertRepeatGap = ApplicationRegistry.getInstance().getConfiguration().getMinimumAlertRepeatGap();
 
     private long _capacity = ApplicationRegistry.getInstance().getConfiguration().getCapacity();
 
@@ -167,7 +168,7 @@ public class SimpleAMQQueue implements A
     private AtomicInteger _deliveredMessages = new AtomicInteger();
     private AtomicBoolean _stopped = new AtomicBoolean(false);
 
-    private final ConcurrentMap<AMQSessionModel, Boolean> _blockedChannels = new ConcurrentHashMap<AMQSessionModel, Boolean>();
+    private final Set<AMQSessionModel> _blockedChannels = new ConcurrentSkipListSet<AMQSessionModel>();
 
     private final AtomicBoolean _deleted = new AtomicBoolean(false);
     private final List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>();
@@ -455,7 +456,10 @@ public class SimpleAMQQueue implements A
             }
         }
 
-        _activeSubscriberCount.incrementAndGet();
+        if(subscription.isActive())
+        {
+            _activeSubscriberCount.incrementAndGet();
+        }
         subscription.setStateListener(this);
         subscription.setQueueContext(new QueueContext(_entries.getHead()));
 
@@ -778,7 +782,9 @@ public class SimpleAMQQueue implements A
     private boolean mightAssign(final Subscription sub, final QueueEntry entry)
     {
         if(_messageGroupManager == null || !sub.acquires())
+        {
             return true;
+        }
         Subscription assigned = _messageGroupManager.getAssignedSubscription(entry);
         return (assigned == null) || (assigned == sub);
     }
@@ -848,7 +854,7 @@ public class SimpleAMQQueue implements A
                 QueueContext context = (QueueContext) subscription.getQueueContext();
                 if(context != null)
                 {
-                    QueueEntry subnode = context._lastSeenEntry;
+                    QueueEntry subnode = context.getLastSeenEntry();
                     if(subnode.compareTo(entry)<0)
                     {
                         return false;
@@ -872,7 +878,7 @@ public class SimpleAMQQueue implements A
     private void setLastSeenEntry(final Subscription sub, final QueueEntry entry)
     {
         QueueContext subContext = (QueueContext) sub.getQueueContext();
-        QueueEntry releasedEntry = subContext._releasedEntry;
+        QueueEntry releasedEntry = subContext.getReleasedEntry();
 
         QueueContext._lastSeenUpdater.set(subContext, entry);
         if(releasedEntry == entry)
@@ -889,7 +895,7 @@ public class SimpleAMQQueue implements A
         {
             QueueEntry oldEntry;
 
-            while((oldEntry  = subContext._releasedEntry) == null || oldEntry.compareTo(entry) > 0)
+            while((oldEntry  = subContext.getReleasedEntry()) == null || oldEntry.compareTo(entry) > 0)
             {
                 if(QueueContext._releasedUpdater.compareAndSet(subContext, oldEntry, entry))
                 {
@@ -1113,6 +1119,17 @@ public class SimpleAMQQueue implements A
         return _stateChangeCount.get();
     }
 
+    /** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */
+    protected QueueEntryList getEntries()
+    {
+        return _entries;
+    }
+
+    protected SubscriptionList getSubscriptionList()
+    {
+        return _subscriptionList;
+    }
+
 
     public static interface QueueEntryFilter
     {
@@ -1226,19 +1243,10 @@ public class SimpleAMQQueue implements A
 
     public void moveMessagesToAnotherQueue(final long fromMessageId,
                                            final long toMessageId,
-                                           String queueName,
-                                           ServerTransaction txn) throws IllegalArgumentException
+                                           String destinationQueueName) throws IllegalArgumentException
     {
 
-        final AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
-        if (toQueue == null)
-        {
-            throw new IllegalArgumentException("Queue '" + queueName + "' is not registered with the virtualhost.");
-        }
-        else if (toQueue == this)
-        {
-            throw new IllegalArgumentException("The destination queue cant be the same as the source queue");
-        }
+        final AMQQueue toQueue = getValidatedDestinationQueue(destinationQueueName);
 
         List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
         {
@@ -1258,65 +1266,68 @@ public class SimpleAMQQueue implements A
         });
 
 
-
-        // Move the messages in on the message store.
-        for (final QueueEntry entry : entries)
+        final ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore());
+        boolean shouldRollback = true;
+        try
         {
-            final ServerMessage message = entry.getMessage();
-            txn.enqueue(toQueue, message,
-                        new ServerTransaction.Action()
-                        {
-
-                            public void postCommit()
+            // Move the messages in on the message store.
+            for (final QueueEntry entry : entries)
+            {
+                final ServerMessage message = entry.getMessage();
+                txn.enqueue(toQueue, message,
+                            new ServerTransaction.Action()
                             {
-                                try
+
+                                public void postCommit()
                                 {
-                                    toQueue.enqueue(message);
+                                    try
+                                    {
+                                        toQueue.enqueue(message);
+                                    }
+                                    catch (AMQException e)
+                                    {
+                                        throw new RuntimeException(e);
+                                    }
                                 }
-                                catch (AMQException e)
+
+                                public void onRollback()
                                 {
-                                    throw new RuntimeException(e);
+                                    entry.release();
                                 }
-                            }
-
-                            public void onRollback()
+                            });
+                txn.dequeue(this, message,
+                            new ServerTransaction.Action()
                             {
-                                entry.release();
-                            }
-                        });
-            txn.dequeue(this, message,
-                        new ServerTransaction.Action()
-                        {
 
-                            public void postCommit()
-                            {
-                                entry.discard();
-                            }
-
-                            public void onRollback()
-                            {
+                                public void postCommit()
+                                {
+                                    entry.discard();
+                                }
 
-                            }
-                        });
+                                public void onRollback()
+                                {
 
+                                }
+                            });
+            }
+            txn.commit();
+            shouldRollback = false;
+        }
+        finally
+        {
+            if (shouldRollback)
+            {
+                txn.rollback();
+            }
         }
 
     }
 
     public void copyMessagesToAnotherQueue(final long fromMessageId,
                                            final long toMessageId,
-                                           String queueName,
-                                           final ServerTransaction txn) throws IllegalArgumentException
+                                           String destinationQueueName) throws IllegalArgumentException
     {
-        final AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
-        if (toQueue == null)
-        {
-            throw new IllegalArgumentException("Queue '" + queueName + "' is not registered with the virtualhost.");
-        }
-        else if (toQueue == this)
-        {
-            throw new IllegalArgumentException("The destination queue cant be the same as the source queue");
-        }
+        final AMQQueue toQueue = getValidatedDestinationQueue(destinationQueueName);
 
         List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
         {
@@ -1334,36 +1345,63 @@ public class SimpleAMQQueue implements A
             }
         });
 
-
-        // Move the messages in on the message store.
-        for (QueueEntry entry : entries)
+        final ServerTransaction txn = new LocalTransaction(_virtualHost.getMessageStore());
+        boolean shouldRollback = true;
+        try
         {
-            final ServerMessage message = entry.getMessage();
-
-            txn.enqueue(toQueue, message, new ServerTransaction.Action()
+            // Copy the messages in on the message store.
+            for (QueueEntry entry : entries)
             {
-                public void postCommit()
+                final ServerMessage message = entry.getMessage();
+
+                txn.enqueue(toQueue, message, new ServerTransaction.Action()
                 {
-                    try
+                    public void postCommit()
                     {
-                        toQueue.enqueue(message);
+                        try
+                        {
+                            toQueue.enqueue(message);
+                        }
+                        catch (AMQException e)
+                        {
+                            throw new RuntimeException(e);
+                        }
                     }
-                    catch (AMQException e)
+
+                    public void onRollback()
                     {
-                        throw new RuntimeException(e);
                     }
-                }
-
-                public void onRollback()
-                {
+                });
 
-                }
-            });
+            }
 
+            txn.commit();
+            shouldRollback = false;
+        }
+        finally
+        {
+            if (shouldRollback)
+            {
+                txn.rollback();
+            }
         }
 
     }
 
+    private AMQQueue getValidatedDestinationQueue(String queueName)
+    {
+        final AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
+        if (toQueue == null)
+        {
+            throw new IllegalArgumentException("Queue '" + queueName + "' is not registered with the virtualhost.");
+        }
+        else if (toQueue == this)
+        {
+            throw new IllegalArgumentException("The destination queue can't be the same as the source queue");
+        }
+        return toQueue;
+    }
+
     public void removeMessagesFromQueue(long fromMessageId, long toMessageId)
     {
 
@@ -1543,10 +1581,16 @@ public class SimpleAMQQueue implements A
                 for(final QueueEntry entry : entries)
                 {
                     adapter.setEntry(entry);
-                    final List<? extends BaseQueue> rerouteQueues = _alternateExchange.route(adapter);
+                    List<? extends BaseQueue> queues = _alternateExchange.route(adapter);
+                    if((queues == null || queues.size() == 0) && _alternateExchange.getAlternateExchange() != null)
+                    {
+                        queues = _alternateExchange.getAlternateExchange().route(adapter);
+                    }
+
                     final ServerMessage message = entry.getMessage();
-                    if(rerouteQueues != null && rerouteQueues.size() != 0)
+                    if(queues != null && queues.size() != 0)
                     {
+                        final List<? extends BaseQueue> rerouteQueues = queues;
                         txn.enqueue(rerouteQueues, entry.getMessage(),
                                     new ServerTransaction.Action()
                                     {
@@ -1659,7 +1703,7 @@ public class SimpleAMQQueue implements A
                 //Overfull log message
                 _logActor.message(_logSubject, QueueMessages.OVERFULL(_atomicQueueSize.get(), _capacity));
 
-                _blockedChannels.putIfAbsent(channel, Boolean.TRUE);
+                _blockedChannels.add(channel);
 
                 channel.block(this);
 
@@ -1692,11 +1736,10 @@ public class SimpleAMQQueue implements A
                     _logActor.message(_logSubject, QueueMessages.UNDERFULL(_atomicQueueSize.get(), _flowResumeCapacity));
                 }
 
-
-                for(AMQSessionModel c : _blockedChannels.keySet())
+                for(final AMQSessionModel blockedChannel : _blockedChannels)
                 {
-                    c.unblock(this);
-                    _blockedChannels.remove(c);
+                    blockedChannel.unblock(this);
+                    _blockedChannels.remove(blockedChannel);
                 }
             }
         }
@@ -1714,7 +1757,6 @@ public class SimpleAMQQueue implements A
 
     public void deliverAsync(Subscription sub)
     {
-        //_stateChangeCount.incrementAndGet();
         if(_exclusiveSubscriber == null)
         {
             deliverAsync();
@@ -1890,8 +1932,8 @@ public class SimpleAMQQueue implements A
         QueueContext context = (QueueContext) sub.getQueueContext();
         if(context != null)
         {
-            QueueEntry lastSeen = context._lastSeenEntry;
-            QueueEntry releasedNode = context._releasedEntry;
+            QueueEntry lastSeen = context.getLastSeenEntry();
+            QueueEntry releasedNode = context.getReleasedEntry();
 
             QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen);
 
@@ -1913,8 +1955,8 @@ public class SimpleAMQQueue implements A
                     QueueContext._releasedUpdater.compareAndSet(context, releasedNode, null);
                 }
 
-                lastSeen = context._lastSeenEntry;
-                releasedNode = context._releasedEntry;
+                lastSeen = context.getLastSeenEntry();
+                releasedNode = context.getReleasedEntry();
                 node = (releasedNode != null && lastSeen.compareTo(releasedNode)>0) ? releasedNode : _entries.next(lastSeen);
             }
             return node;
@@ -1930,8 +1972,8 @@ public class SimpleAMQQueue implements A
         QueueContext context = (QueueContext) sub.getQueueContext();
         if(context != null)
         {
-            QueueEntry releasedNode = context._releasedEntry;
-            return releasedNode == null || releasedNode.compareTo(entry) < 0;
+            QueueEntry releasedNode = context.getReleasedEntry();
+            return releasedNode != null && releasedNode.compareTo(entry) < 0;
         }
         else
         {
@@ -2255,8 +2297,7 @@ public class SimpleAMQQueue implements A
 
         public boolean equals(Object o)
         {
-            return o != null
-                    && o instanceof SimpleAMQQueue.QueueEntryListener
+            return o instanceof SimpleAMQQueue.QueueEntryListener
                     && _sub == ((QueueEntryListener) o)._sub;
         }
 

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryImpl.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryImpl.java Sat Mar 10 19:22:10 2012
@@ -22,9 +22,16 @@ package org.apache.qpid.server.queue;
 
 import org.apache.qpid.server.message.ServerMessage;
 
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
 public class SimpleQueueEntryImpl extends QueueEntryImpl
 {
-    volatile SimpleQueueEntryImpl _next;
+    static final AtomicReferenceFieldUpdater<SimpleQueueEntryImpl, SimpleQueueEntryImpl>
+                _nextUpdater =
+            AtomicReferenceFieldUpdater.newUpdater
+            (SimpleQueueEntryImpl.class, SimpleQueueEntryImpl.class, "_next");
+
+    private volatile SimpleQueueEntryImpl _next;
 
     public SimpleQueueEntryImpl(SimpleQueueEntryList queueEntryList)
     {

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java Sat Mar 10 19:22:10 2012
@@ -20,9 +20,11 @@
 */
 package org.apache.qpid.server.queue;
 
+import org.apache.qpid.server.message.ServerMessage;
+
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import org.apache.qpid.server.message.ServerMessage;
 
 public class SimpleQueueEntryList implements QueueEntryList<SimpleQueueEntryImpl>
 {
@@ -40,12 +42,11 @@ public class SimpleQueueEntryList implem
     private final AMQQueue _queue;
 
     static final AtomicReferenceFieldUpdater<SimpleQueueEntryImpl, SimpleQueueEntryImpl>
-                _nextUpdater =
-            AtomicReferenceFieldUpdater.newUpdater
-            (SimpleQueueEntryImpl.class, SimpleQueueEntryImpl.class, "_next");
+                _nextUpdater = SimpleQueueEntryImpl._nextUpdater;
 
     private AtomicLong _scavenges = new AtomicLong(0L);
     private final long _scavengeCount = Integer.getInteger("qpid.queue.scavenge_count", 50);
+    private final AtomicReference<SimpleQueueEntryImpl> _unscavengedHWM = new AtomicReference<SimpleQueueEntryImpl>();
 
 
     public SimpleQueueEntryList(AMQQueue queue)
@@ -55,28 +56,17 @@ public class SimpleQueueEntryList implem
         _tail = _head;
     }
 
-    void advanceHead()
-    {
-        SimpleQueueEntryImpl next = _head.getNextNode();
-        SimpleQueueEntryImpl newNext = _head.getNextValidEntry();
-
-        if (next == newNext)
-        {
-            if (_scavenges.incrementAndGet() > _scavengeCount)
-            {
-                _scavenges.set(0L);
-                scavenge();
-            }
-        }
-    }
-
     void scavenge()
     {
+        SimpleQueueEntryImpl hwm = _unscavengedHWM.getAndSet(null);
         SimpleQueueEntryImpl next = _head.getNextValidEntry();
 
-        while (next != null)
+        if(hwm != null)
         {
-            next = next.getNextValidEntry();
+            while (next != null && hwm.compareTo(next)>0)
+            {
+                next = next.getNextValidEntry();
+            }
         }
     }
 
@@ -126,7 +116,6 @@ public class SimpleQueueEntryList implem
 
     public static class QueueEntryIteratorImpl implements QueueEntryIterator<SimpleQueueEntryImpl>
     {
-
         private SimpleQueueEntryImpl _lastNode;
 
         QueueEntryIteratorImpl(SimpleQueueEntryImpl startNode)
@@ -134,10 +123,9 @@ public class SimpleQueueEntryList implem
             _lastNode = startNode;
         }
 
-
         public boolean atTail()
         {
-            return _lastNode.getNextNode() == null;
+            return _lastNode.getNextValidEntry() == null;
         }
 
         public SimpleQueueEntryImpl getNode()
@@ -147,28 +135,17 @@ public class SimpleQueueEntryList implem
 
         public boolean advance()
         {
+            SimpleQueueEntryImpl nextValidNode = _lastNode.getNextValidEntry();
 
-            if(!atTail())
+            if(nextValidNode != null)
             {
-                SimpleQueueEntryImpl nextNode = _lastNode.getNextNode();
-                while(nextNode.isDispensed() && nextNode.getNextNode() != null)
-                {
-                    nextNode = nextNode.getNextNode();
-                }
-                _lastNode = nextNode;
-                return true;
-
-            }
-            else
-            {
-                return false;
+                _lastNode = nextValidNode;
             }
 
+            return nextValidNode != null;
         }
-
     }
 
-
     public QueueEntryIteratorImpl iterator()
     {
         return new QueueEntryIteratorImpl(_head);
@@ -182,7 +159,32 @@ public class SimpleQueueEntryList implem
 
     public void entryDeleted(SimpleQueueEntryImpl queueEntry)
     {
-        advanceHead();
+        SimpleQueueEntryImpl next = _head.getNextNode();
+        SimpleQueueEntryImpl newNext = _head.getNextValidEntry();
+
+        // the head of the queue has not been deleted, hence the deletion must have been mid queue.
+        if (next == newNext)
+        {
+            SimpleQueueEntryImpl unscavengedHWM = _unscavengedHWM.get();
+            while(unscavengedHWM == null || unscavengedHWM.compareTo(queueEntry)<0)
+            {
+                _unscavengedHWM.compareAndSet(unscavengedHWM, queueEntry);
+                unscavengedHWM = _unscavengedHWM.get();
+            }
+            if (_scavenges.incrementAndGet() > _scavengeCount)
+            {
+                _scavenges.set(0L);
+                scavenge();
+            }
+        }
+        else
+        {
+            SimpleQueueEntryImpl unscavengedHWM = _unscavengedHWM.get();
+            if(unscavengedHWM != null && (next == null || unscavengedHWM.compareTo(next) < 0))
+            {
+                _unscavengedHWM.compareAndSet(unscavengedHWM, null);
+            }
+        }
     }
 
     public int getPriorities()

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java Sat Mar 10 19:22:10 2012
@@ -19,11 +19,12 @@
  */
 package org.apache.qpid.server.queue;
 
-import java.util.Map;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
+import java.util.Map;
+
 public class SortedQueue extends OutOfOrderQueue
 {
     //Lock object to synchronize enqueue. Used instead of the object

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java Sat Mar 10 19:22:10 2012
@@ -21,12 +21,7 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.server.message.ServerMessage;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.server.queue.SortedQueueEntryImpl.Colour;
-import org.apache.qpid.server.store.StoreContext;
 
 /**
  * A sorted implementation of QueueEntryList.
@@ -367,7 +362,7 @@ public class SortedQueueEntryList implem
 
             if(chosenChild != null)
             {
-                // we have one child (x), we can move it up to replace x;
+                // we have one child (x), we can move it up to replace x
                 chosenChild.setParent(entry.getParent());
                 if(chosenChild.getParent() == null)
                 {

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java Sat Mar 10 19:22:10 2012
@@ -21,14 +21,16 @@ package org.apache.qpid.server.queue;
  */
 
 
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.log4j.Logger;
+
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.transport.TransportException;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 
 class SubFlushRunner implements Runnable
@@ -67,18 +69,30 @@ class SubFlushRunner implements Runnable
             }
             catch (AMQException e)
             {
-                _logger.error(e);
+                _logger.error("Exception during asynchronous delivery by " + toString(), e);
             }
-            finally
+            catch (final TransportException transe)
             {
-                CurrentActor.remove();
+                final String errorMessage = "Problem during asynchronous delivery by " + toString();
+                if(_logger.isDebugEnabled())
+                {
+                    _logger.debug(errorMessage, transe);
+                }
+                else
+                {
+                    _logger.info(errorMessage + ' ' + transe.getMessage());
+                }
             }
-            _scheduled.compareAndSet(RUNNING, IDLE);
-            if ((!complete || _stateChange.compareAndSet(true,false))&& !_sub.isSuspended())
+            finally
             {
-                if(_scheduled.compareAndSet(IDLE,SCHEDULED))
+                CurrentActor.remove();
+                _scheduled.compareAndSet(RUNNING, IDLE);
+                if ((!complete || _stateChange.compareAndSet(true,false))&& !_sub.isSuspended())
                 {
-                    getQueue().execute(this);
+                    if(_scheduled.compareAndSet(IDLE,SCHEDULED))
+                    {
+                        getQueue().execute(this);
+                    }
                 }
             }
         }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org