You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2011/12/15 14:54:37 UTC

svn commit: r1214760 - in /qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/flow/ broker/src/main/java/org/apache/qpid/server/protocol/ broker/src/main/java/org/apache/qpid/server/queue/ broker/src/main/java/org/apache/qpid/server/subs...

Author: rgodfrey
Date: Thu Dec 15 13:54:36 2011
New Revision: 1214760

URL: http://svn.apache.org/viewvc?rev=1214760&view=rev
Log:
QPID-3687 : Improve Java Broker performance

Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java?rev=1214760&r1=1214759&r2=1214760&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java Thu Dec 15 13:54:36 2011
@@ -45,6 +45,15 @@ public class WindowCreditManager extends
 
     }
 
+    public long getBytesCreditLimit()
+    {
+        return _bytesCreditLimit;
+    }
+
+    public long getMessageCreditLimit()
+    {
+        return _messageCreditLimit;
+    }
 
     public synchronized void setCreditLimits(final long bytesCreditLimit, final long messageCreditLimit)
     {

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?rev=1214760&r1=1214759&r2=1214760&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java Thu Dec 15 13:54:36 2011
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.protocol;
 
+import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
@@ -47,7 +48,6 @@ import org.apache.qpid.AMQConnectionExce
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQSecurityException;
 import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.codec.AMQDecoder;
 import org.apache.qpid.common.ClientProperties;
 import org.apache.qpid.framing.AMQBody;
 import org.apache.qpid.framing.AMQDataBlock;
@@ -87,16 +87,20 @@ import org.apache.qpid.server.management
 import org.apache.qpid.server.management.ManagedObject;
 import org.apache.qpid.server.output.ProtocolOutputConverter;
 import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
+import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
 import org.apache.qpid.server.state.AMQState;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.subscription.ClientDeliveryMethod;
+import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
 import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.TransportException;
 import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.io.IoSender;
 
 public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQProtocolSession, ConnectionConfig
 {
@@ -139,7 +143,7 @@ public class AMQProtocolEngine implement
 
     /* AMQP Version for this session */
     private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion();
-
+    private MethodRegistry _methodRegistry = MethodRegistry.getMethodRegistry(_protocolVersion);
     private FieldTable _clientProperties;
     private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
 
@@ -173,6 +177,9 @@ public class AMQProtocolEngine implement
     private NetworkConnection _network;
     private Sender<ByteBuffer> _sender;
 
+    private volatile boolean _deferFlush;
+    private long _lastReceivedTime;
+
     public ManagedObject getManagedObject()
     {
         return _managedObject;
@@ -240,14 +247,29 @@ public class AMQProtocolEngine implement
         return _closing.get();
     }
 
+    public synchronized void flushBatched()
+    {
+        _sender.flush();
+    }
+
+
+    public ClientDeliveryMethod createDeliveryMethod(int channelId)
+    {
+        return new WriteDeliverMethod(channelId);
+    }
+
     public void received(final ByteBuffer msg)
     {
-        _lastIoTime = System.currentTimeMillis();
+        final long arrivalTime = System.currentTimeMillis();
+        _lastReceivedTime = arrivalTime;
+        _lastIoTime = arrivalTime;
         try
         {
             final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
-            for (AMQDataBlock dataBlock : dataBlocks)
+            final int len = dataBlocks.size();
+            for (int i = 0; i < len; i++)
             {
+                AMQDataBlock dataBlock = dataBlocks.get(i);
                 try
                 {
                     dataBlockReceived(dataBlock);
@@ -347,7 +369,7 @@ public class AMQProtocolEngine implement
         }
     }
 
-    private void protocolInitiationReceived(ProtocolInitiation pi)
+    private synchronized void protocolInitiationReceived(ProtocolInitiation pi)
     {
         // this ensures the codec never checks for a PI message again
         (_codecFactory.getDecoder()).setExpectProtocolInitiation(false);
@@ -524,12 +546,15 @@ public class AMQProtocolEngine implement
      */
     public synchronized void writeFrame(AMQDataBlock frame)
     {
-        _lastSent = frame;
+
         final ByteBuffer buf = asByteBuffer(frame);
-        _lastIoTime = System.currentTimeMillis();
         _writtenBytes += buf.remaining();
         _sender.send(buf);
-        _sender.flush();
+        _lastIoTime = System.currentTimeMillis();
+        if(!_deferFlush)
+        {
+            _sender.flush();
+        }
     }
 
     public AMQShortString getContextKey()
@@ -918,7 +943,7 @@ public class AMQProtocolEngine implement
     private void setProtocolVersion(ProtocolVersion pv)
     {
         _protocolVersion = pv;
-
+        _methodRegistry = MethodRegistry.getMethodRegistry(_protocolVersion);
         _protocolOutputConverter = ProtocolOutputConverterRegistry.getConverter(this);
         _dispatcher = ServerMethodDispatcherImpl.createMethodDispatcher(_stateManager, _protocolVersion);
     }
@@ -1023,7 +1048,7 @@ public class AMQProtocolEngine implement
 
     public MethodRegistry getMethodRegistry()
     {
-        return MethodRegistry.getMethodRegistry(getProtocolVersion());
+        return _methodRegistry;
     }
 
     public MethodDispatcher getMethodDispatcher()
@@ -1052,7 +1077,7 @@ public class AMQProtocolEngine implement
         // Nothing
     }
 
-    public void writerIdle()
+    public synchronized void writerIdle()
     {
         _sender.send(asByteBuffer(HeartbeatBody.FRAME));
     }
@@ -1109,6 +1134,11 @@ public class AMQProtocolEngine implement
         return _lastIoTime;
     }
 
+    public long getLastReceivedTime()
+    {
+        return _lastReceivedTime;
+    }
+
     public ProtocolSessionIdentifier getSessionIdentifier()
     {
         return _sessionIdentifier;
@@ -1402,9 +1432,215 @@ public class AMQProtocolEngine implement
         return true;
     }
 
+    public void setDeferFlush(boolean deferFlush)
+    {
+        _deferFlush = deferFlush;
+    }
+
+
+
     @Override
     public String getUserName()
     {
         return getAuthorizedPrincipal().getName();
     }
+
+    private static class ByteBufferOutputStream extends OutputStream
+    {
+
+
+        private final ByteBuffer _buf;
+
+        public ByteBufferOutputStream(ByteBuffer buf)
+        {
+            _buf = buf;
+        }
+
+        @Override
+        public void write(int b) throws IOException
+        {
+            _buf.put((byte) b);
+        }
+
+        @Override
+        public void write(byte[] b, int off, int len) throws IOException
+        {
+            _buf.put(b, off, len);
+        }
+    }
+
+    public final class WriteDeliverMethod
+            implements ClientDeliveryMethod
+    {
+        private final int _channelId;
+
+        public WriteDeliverMethod(int channelId)
+        {
+            _channelId = channelId;
+        }
+
+        public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag)
+                throws AMQException
+        {
+            registerMessageDelivered(entry.getMessage().getSize());
+            _protocolOutputConverter.writeDeliver(entry, _channelId, deliveryTag, sub.getConsumerTag());
+            entry.incrementDeliveryCount();
+        }
+
+    }
+
+    private static class BytesDataOutput implements DataOutput
+    {
+        int _pos = 0;
+        byte[] _buf;
+
+        public BytesDataOutput(byte[] buf)
+        {
+            _buf = buf;
+        }
+
+        public void setBuffer(byte[] buf)
+        {
+            _buf = buf;
+            _pos = 0;
+        }
+
+        public void reset()
+        {
+            _pos = 0;
+        }
+
+        public int length()
+        {
+            return _pos;
+        }
+
+        public void write(int b)
+        {
+            _buf[_pos++] = (byte) b;
+        }
+
+        public void write(byte[] b)
+        {
+            System.arraycopy(b, 0, _buf, _pos, b.length);
+            _pos+=b.length;
+        }
+
+
+        public void write(byte[] b, int off, int len)
+        {
+            System.arraycopy(b, off, _buf, _pos, len);
+            _pos+=len;
+
+        }
+
+        public void writeBoolean(boolean v)
+        {
+            _buf[_pos++] = v ? (byte) 1 : (byte) 0;
+        }
+
+        public void writeByte(int v)
+        {
+            _buf[_pos++] = (byte) v;
+        }
+
+        public void writeShort(int v)
+        {
+            _buf[_pos++] = (byte) (v >>> 8);
+            _buf[_pos++] = (byte) v;
+        }
+
+        public void writeChar(int v)
+        {
+            _buf[_pos++] = (byte) (v >>> 8);
+            _buf[_pos++] = (byte) v;
+        }
+
+        public void writeInt(int v)
+        {
+            _buf[_pos++] = (byte) (v >>> 24);
+            _buf[_pos++] = (byte) (v >>> 16);
+            _buf[_pos++] = (byte) (v >>> 8);
+            _buf[_pos++] = (byte) v;
+        }
+
+        public void writeLong(long v)
+        {
+            _buf[_pos++] = (byte) (v >>> 56);
+            _buf[_pos++] = (byte) (v >>> 48);
+            _buf[_pos++] = (byte) (v >>> 40);
+            _buf[_pos++] = (byte) (v >>> 32);
+            _buf[_pos++] = (byte) (v >>> 24);
+            _buf[_pos++] = (byte) (v >>> 16);
+            _buf[_pos++] = (byte) (v >>> 8);
+            _buf[_pos++] = (byte)v;
+        }
+
+        public void writeFloat(float v)
+        {
+            writeInt(Float.floatToIntBits(v));
+        }
+
+        public void writeDouble(double v)
+        {
+            writeLong(Double.doubleToLongBits(v));
+        }
+
+        public void writeBytes(String s)
+        {
+            int len = s.length();
+            for (int i = 0 ; i < len ; i++)
+            {
+                _buf[_pos++] = ((byte)s.charAt(i));
+            }
+        }
+
+        public void writeChars(String s)
+        {
+            int len = s.length();
+            for (int i = 0 ; i < len ; i++)
+            {
+                int v = s.charAt(i);
+                _buf[_pos++] = (byte) (v >>> 8);
+                _buf[_pos++] = (byte) v;
+            }
+        }
+
+        public void writeUTF(String s)
+        {
+            int strlen = s.length();
+
+            int pos = _pos;
+            _pos+=2;
+
+
+            for (int i = 0; i < strlen; i++)
+            {
+                int c = s.charAt(i);
+                if ((c >= 0x0001) && (c <= 0x007F))
+                {
+                    c = s.charAt(i);
+                    _buf[_pos++] = (byte) c;
+
+                }
+                else if (c > 0x07FF)
+                {
+                    _buf[_pos++]  = (byte) (0xE0 | ((c >> 12) & 0x0F));
+                    _buf[_pos++]  = (byte) (0x80 | ((c >>  6) & 0x3F));
+                    _buf[_pos++]  = (byte) (0x80 | (c & 0x3F));
+                }
+                else
+                {
+                    _buf[_pos++] = (byte) (0xC0 | ((c >>  6) & 0x1F));
+                    _buf[_pos++]  = (byte) (0x80 | (c & 0x3F));
+                }
+            }
+
+            int len = _pos - (pos + 2);
+
+            _buf[pos++] = (byte) (len >>> 8);
+            _buf[pos] = (byte) len;
+        }
+
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?rev=1214760&r1=1214759&r2=1214760&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Thu Dec 15 13:54:36 2011
@@ -32,6 +32,7 @@ import org.apache.qpid.server.AMQChannel
 import org.apache.qpid.server.security.AuthorizationHolder;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.subscription.ClientDeliveryMethod;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.List;
@@ -49,6 +50,14 @@ public interface AMQProtocolSession exte
 
     boolean isClosing();
 
+    void flushBatched();
+
+    void setDeferFlush(boolean defer);
+
+    ClientDeliveryMethod createDeliveryMethod(int channelId);
+
+    long getLastReceivedTime();
+
     public static final class ProtocolSessionIdentifier
     {
         private final Object _sessionIdentifier;
@@ -77,15 +86,6 @@ public interface AMQProtocolSession exte
     }
 
     /**
-     * Called when a protocol data block is received
-     *
-     * @param message the data block that has been received
-     *
-     * @throws Exception if processing the datablock fails
-     */
-    void dataBlockReceived(AMQDataBlock message) throws Exception;
-
-    /**
      * Get the context key associated with this session. Context key is described in the AMQ protocol specification (RFC
      * 6).
      *
@@ -234,4 +234,5 @@ public interface AMQProtocolSession exte
     List<AMQChannel> getChannels();
 
     void mgmtCloseChannel(int channelId);
+
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java?rev=1214760&r1=1214759&r2=1214760&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java Thu Dec 15 13:54:36 2011
@@ -100,6 +100,12 @@ public class ProtocolEngine_0_10  extend
         return _network.getLocalAddress();
     }
 
+    public void received(final ByteBuffer buf)
+    {
+        super.received(buf);
+        _connection.receivedComplete();
+    }
+
     public long getReadBytes()
     {
         return _readBytes;

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java?rev=1214760&r1=1214759&r2=1214760&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java Thu Dec 15 13:54:36 2011
@@ -27,6 +27,11 @@ import org.apache.qpid.server.logging.ac
 import org.apache.qpid.server.queue.QueueRunner;
 import org.apache.qpid.server.queue.SimpleAMQQueue;
 
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
 /**
  * QueueRunners are Runnables used to process a queue when requiring
  * asynchronous message delivery to subscriptions, which is necessary
@@ -37,33 +42,64 @@ public class QueueRunner implements Read
 {
     private static final Logger _logger = Logger.getLogger(QueueRunner.class);
 
-    private final String _name;
     private final SimpleAMQQueue _queue;
 
-    public QueueRunner(SimpleAMQQueue queue, long count)
+    private static int IDLE = 0;
+    private static int SCHEDULED = 1;
+    private static int RUNNING = 2;
+
+
+    private final AtomicInteger _scheduled = new AtomicInteger(IDLE);
+
+    private static final long ITERATIONS = SimpleAMQQueue.MAX_ASYNC_DELIVERIES;
+    private final AtomicBoolean _stateChange = new AtomicBoolean();
+
+    private final AtomicLong _lastRunAgain = new AtomicLong();
+    private final AtomicLong _lastRunTime = new AtomicLong();
+
+    private long _runs;
+    private long _continues;
+
+    public QueueRunner(SimpleAMQQueue queue)
     {
         _queue = queue;
-        _name = "QueueRunner-" + count + "-" + queue.getLogActor();
     }
 
+    private int trouble = 0;
+
     public void run()
     {
-        String originalName = Thread.currentThread().getName();
-        try
+        if(_scheduled.compareAndSet(SCHEDULED,RUNNING))
         {
-            Thread.currentThread().setName(_name);
-            CurrentActor.set(_queue.getLogActor());
+            long runAgain = Long.MIN_VALUE;
+            _stateChange.set(false);
+            try
+            {
+                CurrentActor.set(_queue.getLogActor());
+
+                runAgain = _queue.processQueue(this);
+            }
+            catch (AMQException e)
+            {
+                _logger.error("Exception during asynchronous delivery by " + toString(), e);
+            }
+            finally
+            {
+                CurrentActor.remove();
+            }
+            _scheduled.compareAndSet(RUNNING, IDLE);
+            long stateChangeCount = _queue.getStateChangeCount();
+            _lastRunAgain.set(runAgain);
+            _lastRunTime.set(System.nanoTime());
+            if(runAgain == 0L || runAgain != stateChangeCount || _stateChange.compareAndSet(true,false))
+            {
+                _continues++;
+                if(_scheduled.compareAndSet(IDLE, SCHEDULED))
+                {
+                    _queue.execute(this);
+                }
+            }
 
-            _queue.processQueue(this);
-        }
-        catch (AMQException e)
-        {
-            _logger.error("Exception during asynchronous delivery by " + _name, e);
-        }
-        finally
-        {
-            CurrentActor.remove();
-            Thread.currentThread().setName(originalName);
         }
     }
 
@@ -79,6 +115,21 @@ public class QueueRunner implements Read
 
     public String toString()
     {
-        return _name;
+        return "QueueRunner-" + _queue.getLogActor();
+    }
+
+    public void execute(Executor executor)
+    {
+        _stateChange.set(true);
+        if(_scheduled.compareAndSet(IDLE, SCHEDULED))
+        {
+            executor.execute(this);
+        }
+    }
+
+    public boolean isIdle()
+    {
+        return _scheduled.get() == IDLE;
     }
-}
\ No newline at end of file
+
+}

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1214760&r1=1214759&r2=1214760&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Thu Dec 15 13:54:36 2011
@@ -155,11 +155,11 @@ public class SimpleAMQQueue implements A
     private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class);
 
 
-    static final int MAX_ASYNC_DELIVERIES = 10;
+    static final int MAX_ASYNC_DELIVERIES = 80;
 
 
     private final AtomicLong _stateChangeCount = new AtomicLong(Long.MIN_VALUE);
-    private AtomicReference<Runnable> _asynchronousRunner = new AtomicReference<Runnable>(null);
+
     private final Executor _asyncDelivery;
     private AtomicInteger _deliveredMessages = new AtomicInteger();
     private AtomicBoolean _stopped = new AtomicBoolean(false);
@@ -584,33 +584,20 @@ public class SimpleAMQQueue implements A
 
     public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException
     {
+
         incrementTxnEnqueueStats(message);
         incrementQueueCount();
         incrementQueueSize(message);
+
         _totalMessagesReceived.incrementAndGet();
 
 
         QueueEntry entry;
-        Subscription exclusiveSub = _exclusiveSubscriber;
+        final Subscription exclusiveSub = _exclusiveSubscriber;
+        entry = _entries.add(message);
 
-        if (exclusiveSub != null)
+        if(action != null || (exclusiveSub == null  && _queueRunner.isIdle()))
         {
-            exclusiveSub.getSendLock();
-
-            try
-            {
-                entry = _entries.add(message);
-
-                deliverToSubscription(exclusiveSub, entry);
-            }
-            finally
-            {
-                exclusiveSub.releaseSendLock();
-            }
-        }
-        else
-        {
-            entry = _entries.add(message);
             /*
 
             iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message
@@ -666,7 +653,14 @@ public class SimpleAMQQueue implements A
         {
             checkSubscriptionsNotAheadOfDelivery(entry);
 
-            deliverAsync();
+            if (exclusiveSub != null)
+            {
+                deliverAsync(exclusiveSub);
+            }
+            else
+            {
+                deliverAsync();
+           }
         }
 
         if(_managedObject != null)
@@ -685,30 +679,32 @@ public class SimpleAMQQueue implements A
             throws AMQException
     {
 
-        sub.getSendLock();
-        try
+        if(sub.trySendLock())
         {
-            if (subscriptionReadyAndHasInterest(sub, entry)
-                && !sub.isSuspended())
+            try
             {
-                if (!sub.wouldSuspend(entry))
+                if (subscriptionReadyAndHasInterest(sub, entry)
+                    && !sub.isSuspended())
                 {
-                    if (sub.acquires() && !entry.acquire(sub))
-                    {
-                        // restore credit here that would have been taken away by wouldSuspend since we didn't manage
-                        // to acquire the entry for this subscription
-                        sub.restoreCredit(entry);
-                    }
-                    else
+                    if (!sub.wouldSuspend(entry))
                     {
-                        deliverMessage(sub, entry);
+                        if (sub.acquires() && !entry.acquire(sub))
+                        {
+                            // restore credit here that would have been taken away by wouldSuspend since we didn't manage
+                            // to acquire the entry for this subscription
+                            sub.restoreCredit(entry);
+                        }
+                        else
+                        {
+                            deliverMessage(sub, entry, false);
+                        }
                     }
                 }
             }
-        }
-        finally
-        {
-            sub.releaseSendLock();
+            finally
+            {
+                sub.releaseSendLock();
+            }
         }
     }
 
@@ -752,7 +748,7 @@ public class SimpleAMQQueue implements A
         _byteTxnDequeues.addAndGet(entry.getSize());
     }
 
-    private void deliverMessage(final Subscription sub, final QueueEntry entry)
+    private void deliverMessage(final Subscription sub, final QueueEntry entry, boolean batch)
             throws AMQException
     {
         setLastSeenEntry(sub, entry);
@@ -760,7 +756,7 @@ public class SimpleAMQQueue implements A
         _deliveredMessages.incrementAndGet();
         incrementUnackedMsgCount();
 
-        sub.send(entry);
+        sub.send(entry, batch);
     }
 
     private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry) throws AMQException
@@ -866,7 +862,7 @@ public class SimpleAMQQueue implements A
         {
             if (!subscription.isClosed())
             {
-                deliverMessage(subscription, entry);
+                deliverMessage(subscription, entry, false);
                 return true;
             }
             else
@@ -1008,6 +1004,12 @@ public class SimpleAMQQueue implements A
         _exclusiveSubscriber = exclusiveSubscriber;
     }
 
+    long getStateChangeCount()
+    {
+        return _stateChangeCount.get();
+    }
+
+
     public static interface QueueEntryFilter
     {
         public boolean accept(QueueEntry entry);
@@ -1308,7 +1310,7 @@ public class SimpleAMQQueue implements A
         QueueEntryIterator queueListIterator = _entries.iterator();
         long count = 0;
 
-        ServerTransaction txn = new LocalTransaction(getVirtualHost().getTransactionLog());
+        ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore());
 
         while (queueListIterator.advance())
         {
@@ -1331,7 +1333,7 @@ public class SimpleAMQQueue implements A
 
     private void dequeueEntry(final QueueEntry node)
     {
-        ServerTransaction txn = new AutoCommitTransaction(getVirtualHost().getTransactionLog());
+        ServerTransaction txn = new AutoCommitTransaction(getVirtualHost().getMessageStore());
         dequeueEntry(node, txn);
     }
 
@@ -1408,7 +1410,7 @@ public class SimpleAMQQueue implements A
                 }
             });
 
-            ServerTransaction txn = new LocalTransaction(getVirtualHost().getTransactionLog());
+            ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore());
 
             if(_alternateExchange != null)
             {
@@ -1577,26 +1579,34 @@ public class SimpleAMQQueue implements A
         }
     }
 
+    private QueueRunner _queueRunner = new QueueRunner(this);
 
     public void deliverAsync()
     {
-        QueueRunner runner = new QueueRunner(this, _stateChangeCount.incrementAndGet());
+        _stateChangeCount.incrementAndGet();
+
+        _queueRunner.execute(_asyncDelivery);
 
-        if (_asynchronousRunner.compareAndSet(null, runner))
-        {
-            _asyncDelivery.execute(runner);
-        }
     }
 
     public void deliverAsync(Subscription sub)
     {
-        SubFlushRunner flusher = (SubFlushRunner) sub.get(SUB_FLUSH_RUNNER);
-        if(flusher == null)
+        //_stateChangeCount.incrementAndGet();
+        if(_exclusiveSubscriber == null)
         {
-            flusher = new SubFlushRunner(sub);
-            sub.set(SUB_FLUSH_RUNNER, flusher);
+            deliverAsync();
         }
-        _asyncDelivery.execute(flusher);
+        else
+        {
+            SubFlushRunner flusher = (SubFlushRunner) sub.get(SUB_FLUSH_RUNNER);
+            if(flusher == null)
+            {
+                flusher = new SubFlushRunner(sub);
+                sub.set(SUB_FLUSH_RUNNER, flusher);
+            }
+            flusher.execute(_asyncDelivery);
+        }
+
     }
 
     public void flushSubscription(Subscription sub) throws AMQException
@@ -1612,31 +1622,56 @@ public class SimpleAMQQueue implements A
     public boolean flushSubscription(Subscription sub, long iterations) throws AMQException
     {
         boolean atTail = false;
+        final boolean keepSendLockHeld = iterations <=  SimpleAMQQueue.MAX_ASYNC_DELIVERIES;
 
-        while (!sub.isSuspended() && !atTail && iterations != 0)
+        try
         {
-            try
+            if(keepSendLockHeld)
             {
                 sub.getSendLock();
-                atTail = attemptDelivery(sub);
-                if (atTail && sub.isAutoClose())
+            }
+            while (!sub.isSuspended() && !atTail && iterations != 0)
+            {
+                try
                 {
-                    unregisterSubscription(sub);
+                    if(!keepSendLockHeld)
+                    {
+                        sub.getSendLock();
+                    }
 
-                    sub.confirmAutoClose();
+                    atTail = attemptDelivery(sub, true);
+                    if (atTail && !sub.isSuspended() && sub.isAutoClose())
+                    {
+                        unregisterSubscription(sub);
+
+                        sub.confirmAutoClose();
 
+                    }
+                    else if (!atTail)
+                    {
+                        iterations--;
+                    }
                 }
-                else if (!atTail)
+                finally
                 {
-                    iterations--;
+                    if(!keepSendLockHeld)
+                    {
+                        sub.releaseSendLock();
+                    }
                 }
             }
-            finally
+        }
+        finally
+        {
+            if(keepSendLockHeld)
             {
                 sub.releaseSendLock();
             }
+            sub.flushBatched();
+
         }
 
+
         // if there's (potentially) more than one subscription the others will potentially not have been advanced to the
         // next entry they are interested in yet.  This would lead to holding on to references to expired messages, etc
         // which would give us memory "leak".
@@ -1653,11 +1688,13 @@ public class SimpleAMQQueue implements A
      *
      * Looks up the next node for the subscription and attempts to deliver it.
      *
+     *
      * @param sub
+     * @param batch
      * @return true if we have completed all possible deliveries for this sub.
      * @throws AMQException
      */
-    private boolean attemptDelivery(Subscription sub) throws AMQException
+    private boolean attemptDelivery(Subscription sub, boolean batch) throws AMQException
     {
         boolean atTail = false;
 
@@ -1681,7 +1718,7 @@ public class SimpleAMQQueue implements A
                         }
                         else
                         {
-                            deliverMessage(sub, node);
+                            deliverMessage(sub, node, batch);
                         }
 
                     }
@@ -1785,23 +1822,26 @@ public class SimpleAMQQueue implements A
      * @param runner the Runner to schedule
      * @throws AMQException
      */
-    public void processQueue(QueueRunner runner) throws AMQException
+    public long processQueue(QueueRunner runner) throws AMQException
     {
-        long stateChangeCount;
+        long stateChangeCount = Long.MIN_VALUE;
         long previousStateChangeCount = Long.MIN_VALUE;
+        long rVal = Long.MIN_VALUE;
         boolean deliveryIncomplete = true;
 
         boolean lastLoop = false;
         int iterations = MAX_ASYNC_DELIVERIES;
 
-        _asynchronousRunner.compareAndSet(runner, null);
+        final int numSubs = _subscriptionList.size();
+
+        final int perSub = Math.max(iterations / Math.max(numSubs,1), 1);
 
         // For every message enqueue/requeue the we fire deliveryAsync() which
         // increases _stateChangeCount. If _sCC changes whilst we are in our loop
         // (detected by setting previousStateChangeCount to stateChangeCount in the loop body)
         // then we will continue to run for a maximum of iterations.
         // So whilst delivery/rejection is going on a processQueue thread will be running
-        while (iterations != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete) && _asynchronousRunner.compareAndSet(null, runner))
+        while (iterations != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete))
         {
             // we want to have one extra loop after every subscription has reached the point where it cannot move
             // further, just in case the advance of one subscription in the last loop allows a different subscription to
@@ -1812,6 +1852,7 @@ public class SimpleAMQQueue implements A
                 //further asynchronous delivery is required since the
                 //previous loop. keep going if iteration slicing allows.
                 lastLoop = false;
+                rVal = stateChangeCount;
             }
 
             previousStateChangeCount = stateChangeCount;
@@ -1824,33 +1865,47 @@ public class SimpleAMQQueue implements A
             {
                 Subscription sub = subscriptionIter.getNode().getSubscription();
                 sub.getSendLock();
-                try
-                {
-                    //attempt delivery. returns true if no further delivery currently possible to this sub
-                    subscriptionDone = attemptDelivery(sub);
-                    if (subscriptionDone)
+
+                    try
                     {
-                        //close autoClose subscriptions if we are not currently intent on continuing
-                        if (lastLoop && sub.isAutoClose())
+                        for(int i = 0 ; i < perSub; i++)
                         {
-                            unregisterSubscription(sub);
+                            //attempt delivery. returns true if no further delivery currently possible to this sub
+                            subscriptionDone = attemptDelivery(sub, true);
+                            if (subscriptionDone)
+                            {
+                                sub.flushBatched();
+                                //close autoClose subscriptions if we are not currently intent on continuing
+                                if (lastLoop && !sub.isSuspended() && sub.isAutoClose())
+                                {
+
+                                    unregisterSubscription(sub);
+
+                                    sub.confirmAutoClose();
+                                }
+                                break;
+                            }
+                            else
+                            {
+                                //this subscription can accept additional deliveries, so we must
+                                //keep going after this (if iteration slicing allows it)
+                                allSubscriptionsDone = false;
+                                lastLoop = false;
+                                if(--iterations == 0)
+                                {
+                                    sub.flushBatched();
+                                    break;
+                                }
+                            }
 
-                            sub.confirmAutoClose();
                         }
+
+                        sub.flushBatched();
                     }
-                    else
+                    finally
                     {
-                        //this subscription can accept additional deliveries, so we must 
-                        //keep going after this (if iteration slicing allows it)
-                        allSubscriptionsDone = false;
-                        lastLoop = false;
-                        iterations--;
+                        sub.releaseSendLock();
                     }
-                }
-                finally
-                {
-                    sub.releaseSendLock();
-                }
             }
 
             if(allSubscriptionsDone && lastLoop)
@@ -1876,24 +1931,24 @@ public class SimpleAMQQueue implements A
                 deliveryIncomplete = true;
             }
 
-            _asynchronousRunner.set(null);
         }
 
         // If iterations == 0 then the limiting factor was the time-slicing rather than available messages or credit
         // therefore we should schedule this runner again (unless someone beats us to it :-) ).
-        if (iterations == 0 && _asynchronousRunner.compareAndSet(null, runner))
+        if (iterations == 0)
         {
             if (_logger.isDebugEnabled())
             {
                 _logger.debug("Rescheduling runner:" + runner);
             }
-            _asyncDelivery.execute(runner);
+            return 0L;
         }
+        return rVal;
+
     }
 
     public void checkMessageStatus() throws AMQException
     {
-
         QueueEntryIterator queueListIterator = _entries.iterator();
 
         while (queueListIterator.advance())

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java?rev=1214760&r1=1214759&r2=1214760&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java Thu Dec 15 13:54:36 2011
@@ -27,6 +27,10 @@ import org.apache.qpid.server.logging.ac
 import org.apache.qpid.AMQException;
 import org.apache.log4j.Logger;
 
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
 
 class SubFlushRunner implements ReadWriteRunnable
 {
@@ -34,29 +38,33 @@ class SubFlushRunner implements ReadWrit
 
 
     private final Subscription _sub;
-    private final String _name;
+
+    private static int IDLE = 0;
+    private static int SCHEDULED = 1;
+    private static int RUNNING = 2;
+
+
+    private final AtomicInteger _scheduled = new AtomicInteger(IDLE);
+
+
     private static final long ITERATIONS = SimpleAMQQueue.MAX_ASYNC_DELIVERIES;
+    private final AtomicBoolean _stateChange = new AtomicBoolean();
 
     public SubFlushRunner(Subscription sub)
     {
         _sub = sub;
-        _name = "SubFlushRunner-"+_sub;
     }
 
     public void run()
     {
-
-        String originalName = Thread.currentThread().getName();
-        try
+        if(_scheduled.compareAndSet(SCHEDULED, RUNNING))
         {
-            Thread.currentThread().setName(_name);
-
             boolean complete = false;
+            _stateChange.set(false);
             try
             {
                 CurrentActor.set(_sub.getLogActor());
                 complete = getQueue().flushSubscription(_sub, ITERATIONS);
-
             }
             catch (AMQException e)
             {
@@ -66,17 +74,15 @@ class SubFlushRunner implements ReadWrit
             {
                 CurrentActor.remove();
             }
-            if (!complete && !_sub.isSuspended())
+            _scheduled.compareAndSet(RUNNING, IDLE);
+            if ((!complete || _stateChange.compareAndSet(true,false))&& !_sub.isSuspended())
             {
-                getQueue().execute(this);
+                if(_scheduled.compareAndSet(IDLE,SCHEDULED))
+                {
+                    getQueue().execute(this);
+                }
             }
-
         }
-        finally
-        {
-            Thread.currentThread().setName(originalName);
-        }
-
     }
 
     private SimpleAMQQueue getQueue()
@@ -93,4 +99,18 @@ class SubFlushRunner implements ReadWrit
     {
         return true;
     }
+
+    public String toString()
+    {
+        return "SubFlushRunner-" + _sub.getLogActor();
+    }
+
+    public void execute(Executor executor)
+    {
+        _stateChange.set(true);
+        if(_scheduled.compareAndSet(IDLE,SCHEDULED))
+        {
+            executor.execute(this);
+        }
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java?rev=1214760&r1=1214759&r2=1214760&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java Thu Dec 15 13:54:36 2011
@@ -73,13 +73,18 @@ public interface Subscription
 
     void close();
 
-    void send(QueueEntry msg) throws AMQException;
+    void send(QueueEntry entry, boolean batch) throws AMQException;
+
+    void flushBatched();
 
     void queueDeleted(AMQQueue queue);
 
 
     boolean wouldSuspend(QueueEntry msg);
 
+    boolean trySendLock();
+
+
     void getSendLock();
 
     void releaseSendLock();

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=1214760&r1=1214759&r2=1214760&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java Thu Dec 15 13:54:36 2011
@@ -119,11 +119,13 @@ public abstract class SubscriptionImpl i
          * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
          * thread safe.
          *
-         * @param msg   The message to send
+         *
+         * @param entry
+         * @param batch
          * @throws AMQException
          */
         @Override
-        public void send(QueueEntry msg) throws AMQException
+        public void send(QueueEntry entry, boolean batch) throws AMQException
         {
             // We don't decrement the reference here as we don't want to consume the message
             // but we do want to send it to the client.
@@ -131,7 +133,7 @@ public abstract class SubscriptionImpl i
             synchronized (getChannel())
             {
                 long deliveryTag = getChannel().getNextDeliveryTag();
-                sendToClient(msg, deliveryTag);
+                sendToClient(entry, deliveryTag);
             }
 
         }
@@ -173,11 +175,13 @@ public abstract class SubscriptionImpl i
          * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
          * thread safe.
          *
+         *
          * @param entry   The message to send
+         * @param batch
          * @throws AMQException
          */
         @Override
-        public void send(QueueEntry entry) throws AMQException
+        public void send(QueueEntry entry, boolean batch) throws AMQException
         {
             // if we do not need to wait for client acknowledgements
             // we can decrement the reference count immediately.
@@ -193,6 +197,7 @@ public abstract class SubscriptionImpl i
 
             synchronized (getChannel())
             {
+                getChannel().getProtocolSession().setDeferFlush(batch);
                 long deliveryTag = getChannel().getNextDeliveryTag();
 
                 sendToClient(entry, deliveryTag);
@@ -263,11 +268,13 @@ public abstract class SubscriptionImpl i
          * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
          * thread safe.
          *
+         *
          * @param entry   The message to send
+         * @param batch
          * @throws AMQException
          */
         @Override
-        public void send(QueueEntry entry) throws AMQException
+        public void send(QueueEntry entry, boolean batch) throws AMQException
         {
 
             // if we do not need to wait for client acknowledgements
@@ -282,6 +289,7 @@ public abstract class SubscriptionImpl i
 
             synchronized (getChannel())
             {
+                getChannel().getProtocolSession().setDeferFlush(batch);
                 long deliveryTag = getChannel().getNextDeliveryTag();
 
 
@@ -441,10 +449,12 @@ public abstract class SubscriptionImpl i
      * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
      * thread safe.
      *
-     * @param msg   The message to send
+     *
+     * @param entry
+     * @param batch
      * @throws AMQException
      */
-    abstract public void send(QueueEntry msg) throws AMQException;
+    abstract public void send(QueueEntry entry, boolean batch) throws AMQException;
 
 
     public boolean isSuspended()
@@ -578,6 +588,11 @@ public abstract class SubscriptionImpl i
         return !_creditManager.useCreditForMessage(msg.getMessage().getSize());//_channel.wouldSuspend(msg.getMessage());
     }
 
+    public boolean trySendLock()
+    {
+        return _stateChangeLock.tryLock();
+    }
+
     public void getSendLock()
     {
         _stateChangeLock.lock();
@@ -814,4 +829,11 @@ public abstract class SubscriptionImpl i
     {
         return _createTime;
     }
+
+    public void flushBatched()
+    {
+        _channel.getProtocolSession().setDeferFlush(false);
+
+        _channel.getProtocolSession().flushBatched();
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java?rev=1214760&r1=1214759&r2=1214760&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java Thu Dec 15 13:54:36 2011
@@ -61,6 +61,7 @@ import org.apache.qpid.transport.Message
 import org.apache.qpid.transport.MessageProperties;
 import org.apache.qpid.transport.MessageTransfer;
 import org.apache.qpid.transport.Method;
+import org.apache.qpid.transport.Option;
 import org.apache.qpid.transport.Session;
 import org.apache.qpid.transport.Struct;
 import org.apache.qpid.framing.AMQShortString;
@@ -91,6 +92,8 @@ public class Subscription_0_10 implement
     private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
     private final QueueEntry.SubscriptionAssignedState _assignedState = new QueueEntry.SubscriptionAssignedState(this);
 
+    private static final Option[] BATCHED = new Option[] { Option.BATCH };
+
     private final Lock _stateChangeLock = new ReentrantLock();
 
     private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE);
@@ -127,6 +130,8 @@ public class Subscription_0_10 implement
     private final long _createTime = System.currentTimeMillis();
     private final AtomicLong _deliveredCount = new AtomicLong(0);
     private final Map<String, Object> _arguments;
+    private int _deferredMessageCredit;
+    private long _deferredSizeCredit;
 
 
     public Subscription_0_10(ServerSession session, String destination, MessageAcceptMode acceptMode,
@@ -137,6 +142,7 @@ public class Subscription_0_10 implement
     {
         _subscriptionID = subscriptionId;
         _session = session;
+        _postIdSettingAction = new AddMessageDispositionListenerAction(session);
         _destination = destination;
         _acceptMode = acceptMode;
         _acquireMode = acquireMode;
@@ -325,10 +331,26 @@ public class Subscription_0_10 implement
     }
 
 
-    private class AddMessageDispositionListnerAction implements Runnable
+    public static class AddMessageDispositionListenerAction implements Runnable
     {
-        public MessageTransfer _xfr;
-        public ServerSession.MessageDispositionChangeListener _action;
+        private MessageTransfer _xfr;
+        private ServerSession.MessageDispositionChangeListener _action;
+        private ServerSession _session;
+
+        public AddMessageDispositionListenerAction(ServerSession session)
+        {
+            _session = session;
+        }
+
+        public void setXfr(MessageTransfer xfr)
+        {
+            _xfr = xfr;
+        }
+
+        public void setAction(ServerSession.MessageDispositionChangeListener action)
+        {
+            _action = action;
+        }
 
         public void run()
         {
@@ -339,9 +361,9 @@ public class Subscription_0_10 implement
         }
     }
 
-    private final AddMessageDispositionListnerAction _postIdSettingAction = new AddMessageDispositionListnerAction();
+    private final AddMessageDispositionListenerAction _postIdSettingAction;
 
-    public void send(final QueueEntry entry) throws AMQException
+    public void send(final QueueEntry entry, boolean batch) throws AMQException
     {
         ServerMessage serverMsg = entry.getMessage();
 
@@ -586,26 +608,27 @@ public class Subscription_0_10 implement
                                             {
                                                 public void onComplete(Method method)
                                                 {
-                                                    restoreCredit(entry);
+                                                    deferredAddCredit(1, entry.getSize());
                                                 }
                                             });
             }
 
 
-            _postIdSettingAction._xfr = xfr;
+            _postIdSettingAction.setXfr(xfr);
             if(_acceptMode == MessageAcceptMode.EXPLICIT)
             {
-                _postIdSettingAction._action = new ExplicitAcceptDispositionChangeListener(entry, this);
+                _postIdSettingAction.setAction(new ExplicitAcceptDispositionChangeListener(entry, this));
             }
             else if(_acquireMode != MessageAcquireMode.PRE_ACQUIRED)
             {
-                _postIdSettingAction._action = new ImplicitAcceptDispositionChangeListener(entry, this);
+                _postIdSettingAction.setAction(new ImplicitAcceptDispositionChangeListener(entry, this));
             }
             else
             {
-                _postIdSettingAction._action = null;
+                _postIdSettingAction.setAction(null);
             }
 
+
             _session.sendMessage(xfr, _postIdSettingAction);
             entry.incrementDeliveryCount();
             _deliveredCount.incrementAndGet();
@@ -723,6 +746,11 @@ public class Subscription_0_10 implement
         return !_creditManager.useCreditForMessage(entry.getMessage().getSize());
     }
 
+    public boolean trySendLock()
+    {
+        return _stateChangeLock.tryLock();
+    }
+
     public void getSendLock()
     {
         _stateChangeLock.lock();
@@ -788,6 +816,28 @@ public class Subscription_0_10 implement
         return _properties.get(key);
     }
 
+    private void deferredAddCredit(final int deferredMessageCredit, final long deferredSizeCredit)
+    {
+        _deferredMessageCredit += deferredMessageCredit;
+        _deferredSizeCredit += deferredSizeCredit;
+
+    }
+
+    public void flushCreditState()
+    {
+        flushCreditState(false);
+    }
+    public void flushCreditState(boolean strict)
+    {
+        if(strict || !isSuspended() || _deferredMessageCredit >= 200
+          || !(_creditManager instanceof WindowCreditManager)
+          || ((WindowCreditManager)_creditManager).getMessageCreditLimit() < 400 )
+        {
+            _creditManager.restoreCredit(_deferredMessageCredit, _deferredSizeCredit);
+            _deferredMessageCredit = 0;
+            _deferredSizeCredit = 0l;
+        }
+    }
 
     public FlowCreditManager_0_10 getCreditManager()
     {
@@ -890,6 +940,7 @@ public class Subscription_0_10 implement
 
     public void flush() throws AMQException
     {
+        flushCreditState(true);
         _queue.flushSubscription(this);
         stop();
     }
@@ -1029,4 +1080,9 @@ public class Subscription_0_10 implement
         return (LogSubject) this;
     }
 
+
+    public void flushBatched()
+    {
+        _session.getConnection().flush();
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java?rev=1214760&r1=1214759&r2=1214760&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java Thu Dec 15 13:54:36 2011
@@ -446,6 +446,14 @@ public class ServerConnection extends Co
         }
     }
 
+    public void receivedComplete()
+    {
+        for (Session ssn : getChannels())
+        {
+            ((ServerSession)ssn).flushCreditState();
+        }
+    }
+
     @Override
     public ManagedObject getManagedObject()
     {

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=1214760&r1=1214759&r2=1214760&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java Thu Dec 15 13:54:36 2011
@@ -697,6 +697,15 @@ public class ServerSession extends Sessi
         }
     }
 
+    public void flushCreditState()
+    {
+        final Collection<Subscription_0_10> subscriptions = getSubscriptions();
+        for (Subscription_0_10 subscription_0_10 : subscriptions)
+        {
+            subscription_0_10.flushCreditState(false);
+        }
+    }
+
     public int getUnacknowledgedMessageCount()
     {
         return _messageDispositionListenerMap.size();

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java?rev=1214760&r1=1214759&r2=1214760&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java Thu Dec 15 13:54:36 2011
@@ -43,6 +43,8 @@ import org.apache.qpid.server.output.Pro
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
+import org.apache.qpid.server.subscription.ClientDeliveryMethod;
+import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.TestNetworkConnection;
 
@@ -120,6 +122,11 @@ public class InternalTestProtocolSession
     {
     }
 
+    public ClientDeliveryMethod createDeliveryMethod(int channelId)
+    {
+        return new InternalWriteDeliverMethod(channelId);
+    }
+
     public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
     {
     }
@@ -213,4 +220,42 @@ public class InternalTestProtocolSession
         ((AMQChannel)session).getProtocolSession().closeSession();
 
     }
+
+    private class InternalWriteDeliverMethod implements ClientDeliveryMethod
+    {
+        private int _channelId;
+
+        public InternalWriteDeliverMethod(int channelId)
+        {
+            _channelId = channelId;
+        }
+
+
+        @Override
+        public void deliverToClient(Subscription sub, QueueEntry entry, long deliveryTag) throws AMQException
+        {
+            _deliveryCount.incrementAndGet();
+
+            synchronized (_channelDelivers)
+            {
+                Map<AMQShortString, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(_channelId);
+
+                if (consumers == null)
+                {
+                    consumers = new HashMap<AMQShortString, LinkedList<DeliveryPair>>();
+                    _channelDelivers.put(_channelId, consumers);
+                }
+
+                LinkedList<DeliveryPair> consumerDelivers = consumers.get(sub.getConsumerTag());
+
+                if (consumerDelivers == null)
+                {
+                    consumerDelivers = new LinkedList<DeliveryPair>();
+                    consumers.put(sub.getConsumerTag(), consumerDelivers);
+                }
+
+                consumerDelivers.add(new DeliveryPair(deliveryTag, (AMQMessage)entry.getMessage()));
+            }
+        }
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java?rev=1214760&r1=1214759&r2=1214760&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java Thu Dec 15 13:54:36 2011
@@ -30,6 +30,7 @@ import org.apache.qpid.framing.abstracti
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.message.AMQMessage;
 import org.apache.qpid.server.message.MessageMetaData;
+import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.util.InternalBrokerBaseCase;
@@ -143,16 +144,19 @@ public class AckTest extends InternalBro
             qs.add(_queue);
             msg.enqueue(qs);
             MessageMetaData mmd = msg.headersReceived();
-            msg.setStoredMessage(_messageStore.addMessage(mmd));
+            final StoredMessage storedMessage = _messageStore.addMessage(mmd);
+            msg.setStoredMessage(storedMessage);
+            final AMQMessage message = new AMQMessage(storedMessage);
             if(msg.allContentReceived())
             {
                 ServerTransaction txn = new AutoCommitTransaction(_messageStore);
-                txn.enqueue(_queue, msg, new ServerTransaction.Action() {
+                txn.enqueue(_queue, message, new ServerTransaction.Action() {
                     public void postCommit()
                     {
                         try
                         {
-                            _queue.enqueue(new AMQMessage(msg.getStoredMessage()));
+
+                            _queue.enqueue(message);
                         }
                         catch (AMQException e)
                         {
@@ -170,6 +174,15 @@ public class AckTest extends InternalBro
             // we manually send the message to the subscription
             //_subscription.send(new QueueEntry(_queue,msg), _queue);
         }
+        try
+        {
+            Thread.sleep(2000L);
+        }
+        catch (InterruptedException e)
+        {
+            e.printStackTrace();  //TODO.
+        }
+
     }
 
     /**
@@ -181,9 +194,8 @@ public class AckTest extends InternalBro
         _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true, null, false, new LimitlessCreditManager());
         final int msgCount = 10;
         publishMessages(msgCount, true);
-
         UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
-        assertEquals("",msgCount,map.size());
+        assertEquals("Unextpected size for unacknowledge message map",msgCount,map.size());
 
         Set<Long> deliveryTagSet = map.getDeliveryTags();
         int i = 1;
@@ -206,7 +218,6 @@ public class AckTest extends InternalBro
         _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false, null, false, new LimitlessCreditManager());
         final int msgCount = 10;
         publishMessages(msgCount);
-
         UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
         assertTrue(map.size() == 0);
         assertTrue(_messageStore.getMessageCount() == 0);
@@ -243,7 +254,7 @@ public class AckTest extends InternalBro
 
         _channel.acknowledgeMessage(5, false);
         UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
-        assertTrue(map.size() == msgCount - 1);
+        assertEquals("Map not expected size",msgCount - 1,map.size());
 
         Set<Long> deliveryTagSet = map.getDeliveryTags();
         int i = 1;
@@ -270,6 +281,8 @@ public class AckTest extends InternalBro
         final int msgCount = 10;
         publishMessages(msgCount);
 
+
+
         _channel.acknowledgeMessage(5, true);
         UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
         assertTrue(map.size() == 5);

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=1214760&r1=1214759&r2=1214760&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Thu Dec 15 13:54:36 2011
@@ -44,7 +44,6 @@ import org.apache.qpid.server.message.Se
 import org.apache.qpid.server.queue.BaseQueue.PostEnqueueAction;
 import org.apache.qpid.server.queue.SimpleAMQQueue.QueueEntryFilter;
 import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.store.TestableMemoryMessageStore;
 import org.apache.qpid.server.subscription.MockSubscription;
@@ -189,6 +188,13 @@ public class SimpleAMQQueueTest extends 
         // Check sending a message ends up with the subscriber
         AMQMessage messageA = createMessage(new Long(24));
         _queue.enqueue(messageA);
+        try
+        {
+            Thread.sleep(2000L);
+        }
+        catch(InterruptedException e)
+        {
+        }
         assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
         assertNull(((QueueContext)_subscription.getQueueContext())._releasedEntry);
 
@@ -430,6 +436,13 @@ public class SimpleAMQQueueTest extends 
         // Check sending a message ends up with the subscriber
         AMQMessage messageA = createMessage(new Long(24));
         _queue.enqueue(messageA);
+        try
+        {
+            Thread.sleep(2000L);
+        }
+        catch (InterruptedException e)
+        {
+        }
         assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
 
         // Check we cannot add a second subscriber to the queue
@@ -723,7 +736,7 @@ public class SimpleAMQQueueTest extends 
         assertEquals("No messages should have been delivered yet", 0, sub3.getMessages().size());
 
         // call processQueue to deliver the messages
-        testQueue.processQueue(new QueueRunner(testQueue, 1)
+        testQueue.processQueue(new QueueRunner(testQueue)
         {
             @Override
             public void run()
@@ -826,7 +839,7 @@ public class SimpleAMQQueueTest extends 
 
     /**
      * Tests that dequeued message is not copied as part of invocation of
-     * {@link SimpleAMQQueue#copyMessagesToAnotherQueue(long, long, String, StoreContext)}
+     * {@link SimpleAMQQueue#copyMessagesToAnotherQueue(long, long, String, ServerTransaction)}
      */
     public void testCopyMessagesWithDequeuedEntry()
     {
@@ -844,7 +857,7 @@ public class SimpleAMQQueueTest extends 
         SimpleAMQQueue queue = createQueue(anotherQueueName);
 
         // create transaction
-        ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog());
+        ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getMessageStore());
 
         // copy messages into another queue
         _queue.copyMessagesToAnotherQueue(0, messageNumber, anotherQueueName, txn);
@@ -876,7 +889,7 @@ public class SimpleAMQQueueTest extends 
 
     /**
      * Tests that dequeued message is not moved as part of invocation of
-     * {@link SimpleAMQQueue#moveMessagesToAnotherQueue(long, long, String, StoreContext)}
+     * {@link SimpleAMQQueue#moveMessagesToAnotherQueue(long, long, String, ServerTransaction)}
      */
     public void testMovedMessagesWithDequeuedEntry()
     {
@@ -894,7 +907,7 @@ public class SimpleAMQQueueTest extends 
         SimpleAMQQueue queue = createQueue(anotherQueueName);
 
         // create transaction
-        ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog());
+        ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getMessageStore());
 
         // move messages into another queue
         _queue.moveMessagesToAnotherQueue(0, messageNumber, anotherQueueName, txn);
@@ -927,7 +940,7 @@ public class SimpleAMQQueueTest extends 
     /**
      * Tests that messages in given range including dequeued one are deleted
      * from the queue on invocation of
-     * {@link SimpleAMQQueue#removeMessagesFromQueue(long, long, StoreContext)}
+     * {@link SimpleAMQQueue#removeMessagesFromQueue(long, long)}
      */
     public void testRemoveMessagesFromQueueWithDequeuedEntry()
     {
@@ -954,7 +967,7 @@ public class SimpleAMQQueueTest extends 
     /**
      * Tests that dequeued message on the top is not accounted and next message
      * is deleted from the queue on invocation of
-     * {@link SimpleAMQQueue#deleteMessageFromTop(StoreContext)}
+     * {@link SimpleAMQQueue#deleteMessageFromTop()}
      */
     public void testDeleteMessageFromTopWithDequeuedEntryOnTop()
     {
@@ -983,7 +996,7 @@ public class SimpleAMQQueueTest extends 
 
     /**
      * Tests that all messages including dequeued one are deleted from the queue
-     * on invocation of {@link SimpleAMQQueue#clearQueue(StoreContext)}
+     * on invocation of {@link SimpleAMQQueue#clearQueue()}
      */
     public void testClearQueueWithDequeuedEntry()
     {
@@ -1049,10 +1062,12 @@ public class SimpleAMQQueueTest extends 
         {
             /**
              * Send a message and decrement latch
+             * @param entry
+             * @param batch
              */
-            public void send(QueueEntry msg) throws AMQException
+            public void send(QueueEntry entry, boolean batch) throws AMQException
             {
-                super.send(msg);
+                super.send(entry, batch);
                 latch.countDown();
             }
         };
@@ -1063,7 +1078,7 @@ public class SimpleAMQQueueTest extends 
             testQueue.registerSubscription(subscription, false);
 
             // process queue
-            testQueue.processQueue(new QueueRunner(testQueue, 1)
+            testQueue.processQueue(new QueueRunner(testQueue)
             {
                 public void run()
                 {
@@ -1127,6 +1142,19 @@ public class SimpleAMQQueueTest extends 
                                     {
                                         return !(((AMQMessage) message).getMessageId().longValue() % 2 == 0);
                                     }
+
+                                    @Override
+                                    public boolean acquire(Subscription sub)
+                                    {
+                                        if(((AMQMessage) message).getMessageId().longValue() % 2 == 0)
+                                        {
+                                            return false;
+                                        }
+                                        else
+                                        {
+                                            return super.acquire(sub);
+                                        }
+                                    }
                                 };
                             }
                         };
@@ -1243,6 +1271,14 @@ public class SimpleAMQQueueTest extends 
                 fail("Failure to put message on queue:" + e.getMessage());
             }
         }
+        try
+        {
+            Thread.sleep(2000L);
+        }
+        catch (InterruptedException e)
+        {
+            e.printStackTrace();
+        }
     }
 
     /**

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java?rev=1214760&r1=1214759&r2=1214760&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java Thu Dec 15 13:54:36 2011
@@ -125,6 +125,12 @@ public class MockSubscription implements
         return queue;
     }
 
+    public boolean trySendLock()
+    {
+        return _stateChangeLock.tryLock();
+    }
+
+
     public void getSendLock()
     {
         _stateChangeLock.lock();
@@ -216,7 +222,7 @@ public class MockSubscription implements
     {
     }
 
-    public void send(QueueEntry entry) throws AMQException
+    public void send(QueueEntry entry, boolean batch) throws AMQException
     {
         if (messages.contains(entry))
         {
@@ -225,6 +231,12 @@ public class MockSubscription implements
         messages.add(entry);
     }
 
+    @Override
+    public void flushBatched()
+    {
+
+    }
+
     public void setQueueContext(AMQQueue.Context queueContext)
     {
         _queueContext = queueContext;

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java?rev=1214760&r1=1214759&r2=1214760&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java Thu Dec 15 13:54:36 2011
@@ -56,7 +56,7 @@ public class FailoverBehaviourTest exten
     private static boolean CLUSTERED = Boolean.getBoolean("profile.clustered");
 
     /** Default number of messages to send before failover */
-    private static final int DEFAULT_NUMBER_OF_MESSAGES = 10;
+    private static final int DEFAULT_NUMBER_OF_MESSAGES = 40;
 
     /** Actual number of messages to send before failover */
     protected int _messageNumber = Integer.getInteger("profile.failoverMsgCount", DEFAULT_NUMBER_OF_MESSAGES);
@@ -1157,7 +1157,6 @@ public class FailoverBehaviourTest exten
     {
         init(acknowledgeMode, false);
         _consumer.close();
-        QueueBrowser browser = _consumerSession.createBrowser((Queue) _destination);
         _connection.start();
 
         produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, false);
@@ -1165,6 +1164,8 @@ public class FailoverBehaviourTest exten
         {
             _producerSession.commit();
         }
+
+        QueueBrowser browser = _consumerSession.createBrowser((Queue) _destination);
         return browser;
     }
 

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java?rev=1214760&r1=1214759&r2=1214760&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java Thu Dec 15 13:54:36 2011
@@ -54,7 +54,14 @@ public class AMQConnectionTest extends Q
         _topic = new AMQTopic(_connection.getDefaultTopicExchangeName(), new AMQShortString("mytopic"));
         _queue = new AMQQueue(_connection.getDefaultQueueExchangeName(), new AMQShortString("myqueue"));
     }
-    
+
+    @Override
+    protected void tearDown() throws Exception
+    {
+        _connection.close();
+        super.tearDown();    //To change body of overridden methods use File | Settings | File Templates.
+    }
+
     protected void createConnection() throws Exception
     {
         _connection = (AMQConnection) getConnection("guest", "guest");
@@ -67,16 +74,27 @@ public class AMQConnectionTest extends Q
 
     public void testCreateQueueSession() throws JMSException
     {
-        _queueSession = _connection.createQueueSession(false, AMQSession.NO_ACKNOWLEDGE);
+        createQueueSession();
+    }
+
+    private void createQueueSession() throws JMSException
+    {
+        _queueSession =  _connection.createQueueSession(false, AMQSession.NO_ACKNOWLEDGE);
     }
 
     public void testCreateTopicSession() throws JMSException
     {
+        createTopicSession();
+    }
+
+    private void createTopicSession() throws JMSException
+    {
         _topicSession = _connection.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
     }
 
     public void testTopicSessionCreateBrowser() throws JMSException
     {
+        createTopicSession();
         try
         {
             _topicSession.createBrowser(_queue);
@@ -94,6 +112,7 @@ public class AMQConnectionTest extends Q
 
     public void testTopicSessionCreateQueue() throws JMSException
     {
+        createTopicSession();
         try
         {
             _topicSession.createQueue("abc");
@@ -111,6 +130,7 @@ public class AMQConnectionTest extends Q
 
     public void testTopicSessionCreateTemporaryQueue() throws JMSException
     {
+        createTopicSession();
         try
         {
             _topicSession.createTemporaryQueue();
@@ -128,6 +148,7 @@ public class AMQConnectionTest extends Q
 
     public void testQueueSessionCreateTemporaryTopic() throws JMSException
     {
+        createQueueSession();
         try
         {
             _queueSession.createTemporaryTopic();
@@ -145,6 +166,7 @@ public class AMQConnectionTest extends Q
 
     public void testQueueSessionCreateTopic() throws JMSException
     {
+        createQueueSession();
         try
         {
             _queueSession.createTopic("abc");
@@ -162,6 +184,7 @@ public class AMQConnectionTest extends Q
 
     public void testQueueSessionDurableSubscriber() throws JMSException
     {
+        createQueueSession();
         try
         {
             _queueSession.createDurableSubscriber(_topic, "abc");
@@ -179,6 +202,7 @@ public class AMQConnectionTest extends Q
 
     public void testQueueSessionUnsubscribe() throws JMSException
     {
+        createQueueSession();
         try
         {
             _queueSession.unsubscribe("abc");
@@ -243,25 +267,6 @@ public class AMQConnectionTest extends Q
         assertNotNull("Consumer B should have received the message",msg);
     }
     
-    public void testGetChannelID() throws Exception
-    {
-        long maxChannelID = _connection.getMaximumChannelCount();
-        if (isBroker010())
-        {
-            //Usable numbers are 0 to N-1 when using 0-10
-            //and 1 to N for 0-8/0-9
-            maxChannelID = maxChannelID-1;
-        }
-        for (int j = 0; j < 3; j++)
-        {
-            int i = isBroker010() ? 0 : 1;
-            for ( ; i <= maxChannelID; i++)
-            {
-                int id = _connection.getNextChannelID();
-                assertEquals("Unexpected number on iteration "+j, i, id);
-                _connection.deregisterSession(id);
-            }
-        }
-    }
+
 
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org