You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2015/11/06 11:35:34 UTC

svn commit: r1712932 - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/consumer/ broker-core/src/main/java/org/apache/qpid/server/model/port/ broker-core/src/main/java/org/apache/qpid/server/plugin/ broker-core/src/main/java/org/a...

Author: lquack
Date: Fri Nov  6 10:35:34 2015
New Revision: 1712932

URL: http://svn.apache.org/viewvc?rev=1712932&view=rev
Log:
QPID-6788: [Java Broker] Limit the amount of memory that can be occupied by outbound messages on a connection.

Added:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java
      - copied, changed from r1712787, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java
Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Test.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
    qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1712932&r1=1712931&r2=1712932&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java Fri Nov  6 10:35:34 2015
@@ -35,6 +35,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.util.StateChangeListener;
 
 public abstract class AbstractConsumerTarget implements ConsumerTarget
@@ -58,6 +59,10 @@ public abstract class AbstractConsumerTa
     @Override
     public boolean processPending()
     {
+        if (!getSessionModel().getAMQPConnection().isIOThread())
+        {
+            return false;
+        }
         if(hasMessagesToSend())
         {
             sendNextMessage();
@@ -88,10 +93,10 @@ public abstract class AbstractConsumerTa
     @Override
     public final boolean isSuspended()
     {
-        return getSessionModel().getAMQPConnection().isMessageAssignmentSuspended() || doIsSuspended();
+        return getSessionModel().getAMQPConnection().isMessageAssignmentSuspended() || isFlowSuspended();
     }
 
-    protected abstract boolean doIsSuspended();
+    protected abstract boolean isFlowSuspended();
 
     public final State getState()
     {
@@ -184,8 +189,10 @@ public abstract class AbstractConsumerTa
     @Override
     public final long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
     {
+        AMQPConnection<?> amqpConnection = getSessionModel().getAMQPConnection();
+        amqpConnection.reserveOutboundMessageSpace(entry.getMessage().getSize());
         _queue.add(new ConsumerMessageInstancePair(consumer, entry, batch));
-        getSessionModel().getAMQPConnection().notifyWork();
+        amqpConnection.notifyWork();
         return entry.getMessage().getSize();
     }
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java?rev=1712932&r1=1712931&r2=1712932&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java Fri Nov  6 10:35:34 2015
@@ -57,6 +57,7 @@ public interface AmqpPort<X extends Amqp
     String PORT_AMQP_THREAD_POOL_KEEP_ALIVE_TIMEOUT = "qpid.port.amqp.threadPool.keep_alive_timeout";
 
     String PORT_AMQP_NUMBER_OF_SELECTORS = "qpid.port.amqp.threadPool.numberOfSelectors";
+    String PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE = "qpid.port.amqp.outboundMessageBufferSize";
 
     @ManagedContextDefault(name = DEFAULT_AMQP_PROTOCOLS)
     String INSTALLED_PROTOCOLS = AmqpPortImpl.getInstalledProtocolsAsString();
@@ -88,6 +89,9 @@ public interface AmqpPort<X extends Amqp
     @ManagedContextDefault(name = OPEN_CONNECTIONS_WARN_PERCENT)
     int DEFAULT_OPEN_CONNECTIONS_WARN_PERCENT = 80;
 
+    @SuppressWarnings("unused")
+    @ManagedContextDefault(name = PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE)
+    long DEFAULT_PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE = 1024 * 1024;
 
     SSLContext getSSLContext();
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java?rev=1712932&r1=1712931&r2=1712932&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java Fri Nov  6 10:35:34 2015
@@ -24,15 +24,15 @@ import org.apache.qpid.server.model.Brok
 import org.apache.qpid.server.model.Protocol;
 import org.apache.qpid.server.model.Transport;
 import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.transport.ServerNetworkConnection;
 import org.apache.qpid.transport.network.AggregateTicker;
-import org.apache.qpid.transport.network.NetworkConnection;
 
 public interface ProtocolEngineCreator extends Pluggable
 {
     Protocol getVersion();
     byte[] getHeaderIdentifier();
     ProtocolEngine newProtocolEngine(Broker<?> broker,
-                                     NetworkConnection network,
+                                     ServerNetworkConnection network,
                                      AmqpPort<?> port,
                                      Transport transport,
                                      long id,

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1712932&r1=1712931&r2=1712932&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Fri Nov  6 10:35:34 2015
@@ -57,6 +57,7 @@ import org.apache.qpid.server.model.Stat
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.MessageConverterRegistry;
 import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.util.StateChangeListener;
 
 class QueueConsumerImpl
@@ -342,13 +343,28 @@ class QueueConsumerImpl
     @Override
     public final void flush()
     {
-        _queue.flushConsumer(this);
-        _target.processPending();
+        AMQPConnection<?> connection = _target.getSessionModel().getAMQPConnection();
+        try
+        {
+            connection.alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(true);
+            _queue.flushConsumer(this);
+            _target.processPending();
+        }
+        finally
+        {
+            connection.alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(false);
+        }
+
     }
 
     public boolean resend(final QueueEntry entry)
     {
-        return getQueue().resend(entry, this);
+        boolean messageWasResent = getQueue().resend(entry, this);
+        if (messageWasResent)
+        {
+            _target.processPending();
+        }
+        return messageWasResent;
     }
 
     public final long getConsumerNumber()

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java?rev=1712932&r1=1712931&r2=1712932&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java Fri Nov  6 10:35:34 2015
@@ -32,6 +32,8 @@ public interface AMQPConnection<C extend
 {
     boolean isMessageAssignmentSuspended();
 
+    void alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(boolean override);
+
     long getConnectionId();
 
     Principal getAuthorizedPrincipal();
@@ -64,4 +66,7 @@ public interface AMQPConnection<C extend
 
     void sendConnectionCloseAsync(AMQConstant connectionForced, String reason);
 
+    void reserveOutboundMessageSpace(long size);
+
+    boolean isIOThread();
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java?rev=1712932&r1=1712931&r2=1712932&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java Fri Nov  6 10:35:34 2015
@@ -44,6 +44,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.configuration.updater.TaskExecutor;
 import org.apache.qpid.server.connection.ConnectionPrincipal;
+import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.messages.ConnectionMessages;
@@ -76,7 +77,7 @@ public abstract class AbstractAMQPConnec
     private static final Logger _logger = LoggerFactory.getLogger(AbstractAMQPConnection.class);
 
     private final Broker<?> _broker;
-    private final NetworkConnection _network;
+    private final ServerNetworkConnection _network;
     private final AmqpPort<?> _port;
     private final Transport _transport;
     private final Protocol _protocol;
@@ -87,6 +88,8 @@ public abstract class AbstractAMQPConnec
             new CopyOnWriteArrayList<>();
 
     private final LogSubject _logSubject;
+    private final AtomicReference<Thread> _messageAssignmentAllowedThread = new AtomicReference<>();
+    private final AtomicBoolean _messageAssignmentSuspended = new AtomicBoolean();
     private String _clientProduct;
     private String _clientVersion;
     private String _remoteProcessPid;
@@ -100,11 +103,10 @@ public abstract class AbstractAMQPConnec
     private volatile long _lastReadTime;
     private volatile long _lastWriteTime;
     private volatile AccessControlContext _accessControllerContext;
-
-    private final AtomicReference<Thread> _messageAssignmentSuspended = new AtomicReference<>();
+    private volatile Thread _ioThread;
 
     public AbstractAMQPConnection(Broker<?> broker,
-                                  NetworkConnection network,
+                                  ServerNetworkConnection network,
                                   AmqpPort<?> port,
                                   Transport transport,
                                   Protocol protocol,
@@ -387,6 +389,60 @@ public abstract class AbstractAMQPConnec
         _clientId = clientId;
     }
 
+    @Override
+    public boolean isMessageAssignmentSuspended()
+    {
+        Thread currentThread = Thread.currentThread();
+        if (_messageAssignmentAllowedThread.get() == currentThread && currentThread == _ioThread)
+        {
+            return false;
+        }
+        return _messageAssignmentSuspended.get();
+    }
+
+    @Override
+    public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended)
+    {
+        _messageAssignmentSuspended.set(messageAssignmentSuspended);
+
+        for(AMQSessionModel<?> session : getSessionModels())
+        {
+            if (messageAssignmentSuspended)
+            {
+                session.ensureConsumersNoticedStateChange();
+            }
+            else
+            {
+                session.notifyConsumerTargetCurrentStates();
+            }
+        }
+    }
+
+    @Override
+    public void alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(boolean allowed)
+    {
+        if (allowed)
+        {
+            _messageAssignmentAllowedThread.set(Thread.currentThread());
+        }
+        else
+        {
+            _messageAssignmentAllowedThread.set(null);
+        }
+    }
+
+    @Override
+    public void setIOThread(final Thread ioThread)
+    {
+        _ioThread = ioThread;
+    }
+
+    @Override
+    public boolean isIOThread()
+    {
+        return Thread.currentThread() == _ioThread;
+    }
+
     private <T> T runAsSubject(PrivilegedAction<T> action)
     {
         return Subject.doAs(_subject, action);
@@ -557,28 +613,9 @@ public abstract class AbstractAMQPConnec
     }
 
     @Override
-    public boolean isMessageAssignmentSuspended()
-    {
-        Thread lock = _messageAssignmentSuspended.get();
-        return lock != null && _messageAssignmentSuspended.get() != Thread.currentThread();
-    }
-
-    @Override
-    public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended)
+    public void reserveOutboundMessageSpace(final long size)
     {
-        _messageAssignmentSuspended.set(messageAssignmentSuspended ? Thread.currentThread() : null);
-
-        for(AMQSessionModel<?> session : getSessionModels())
-        {
-            if (messageAssignmentSuspended)
-            {
-                session.ensureConsumersNoticedStateChange();
-            }
-            else
-            {
-                session.notifyConsumerTargetCurrentStates();
-            }
-        }
+        _network.reserveOutboundMessageSpace(size);
     }
 
     protected void markTransportClosed()

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java?rev=1712932&r1=1712931&r2=1712932&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java Fri Nov  6 10:35:34 2015
@@ -46,7 +46,6 @@ import org.apache.qpid.server.security.M
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.network.AggregateTicker;
-import org.apache.qpid.transport.network.NetworkConnection;
 
 public class MultiVersionProtocolEngine implements ProtocolEngine
 {
@@ -63,7 +62,7 @@ public class MultiVersionProtocolEngine
     private Set<Protocol> _supported;
     private String _fqdn;
     private final Broker<?> _broker;
-    private NetworkConnection _network;
+    private ServerNetworkConnection _network;
     private ByteBufferSender _sender;
     private final Protocol _defaultSupportedReply;
 
@@ -141,6 +140,12 @@ public class MultiVersionProtocolEngine
         _delegate.received(msg);
     }
 
+    @Override
+    public void setIOThread(final Thread ioThread)
+    {
+        _delegate.setIOThread(ioThread);
+    }
+
     public long getConnectionId()
     {
         return _id;
@@ -164,7 +169,7 @@ public class MultiVersionProtocolEngine
         _delegate.setTransportBlockedForWriting(blocked);
     }
 
-    public void setNetworkConnection(NetworkConnection network)
+    public void setNetworkConnection(ServerNetworkConnection network)
     {
         _network = network;
         SocketAddress address = _network.getLocalAddress();
@@ -281,6 +286,12 @@ public class MultiVersionProtocolEngine
         }
 
         @Override
+        public void setIOThread(final Thread ioThread)
+        {
+
+        }
+
+        @Override
         public void closed()
         {
 
@@ -520,6 +531,12 @@ public class MultiVersionProtocolEngine
 
         }
 
+        @Override
+        public void setIOThread(final Thread ioThread)
+        {
+
+        }
+
 
         @Override
         public Subject getSubject()

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1712932&r1=1712931&r2=1712932&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java Fri Nov  6 10:35:34 2015
@@ -29,6 +29,7 @@ import java.util.Collection;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,12 +39,11 @@ import org.apache.qpid.server.model.port
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.transport.ByteBufferSender;
-import org.apache.qpid.transport.network.NetworkConnection;
 import org.apache.qpid.transport.network.Ticker;
 import org.apache.qpid.transport.network.TransportEncryption;
 import org.apache.qpid.util.SystemUtils;
 
-public class NonBlockingConnection implements NetworkConnection, ByteBufferSender
+public class NonBlockingConnection implements ServerNetworkConnection, ByteBufferSender
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingConnection.class);
 
@@ -56,6 +56,8 @@ public class NonBlockingConnection imple
     private final AtomicBoolean _closed = new AtomicBoolean(false);
     private final ProtocolEngine _protocolEngine;
     private final Runnable _onTransportEncryptionAction;
+    private final AtomicLong _usedOutboundMessageSpace = new AtomicLong();
+    private final long _outboundMessageBufferLimit;
 
     private volatile int _maxReadIdle;
     private volatile int _maxWriteIdle;
@@ -87,6 +89,9 @@ public class NonBlockingConnection imple
         _port = port;
         _threadName = SelectorThread.IO_THREAD_NAME_PREFIX + _remoteSocketAddress.toString();
 
+        _outboundMessageBufferLimit = (long) _port.getContextValue(Long.class,
+                                                                   AmqpPort.PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE);
+
         protocolEngine.setWorkListener(new Action<ProtocolEngine>()
         {
             @Override
@@ -197,6 +202,15 @@ public class NonBlockingConnection imple
         return _maxWriteIdle;
     }
 
+    @Override
+    public void reserveOutboundMessageSpace(long size)
+    {
+        if (_usedOutboundMessageSpace.addAndGet(size) > _outboundMessageBufferLimit)
+        {
+            _protocolEngine.setMessageAssignmentSuspended(true);
+        }
+    }
+
     public boolean canRead()
     {
         return _fullyWritten;
@@ -226,11 +240,12 @@ public class NonBlockingConnection imple
                     getTicker().tick(currentTime);
                 }
 
+                _protocolEngine.setIOThread(Thread.currentThread());
                 _protocolEngine.setMessageAssignmentSuspended(true);
 
                 if (!_fullyWritten)
                 {
-                    _fullyWritten = doWrite();
+                    doWrite();
                 }
 
                 if (_fullyWritten)
@@ -239,17 +254,23 @@ public class NonBlockingConnection imple
 
                     _protocolEngine.setTransportBlockedForWriting(!doWrite());
                     boolean dataRead = doRead();
-                    _fullyWritten = doWrite();
-                    _protocolEngine.setTransportBlockedForWriting(!_fullyWritten);
+                    _protocolEngine.setTransportBlockedForWriting(!doWrite());
 
-                    if (dataRead || (_delegate.needsWork() && _delegate.getNetInputBuffer().position() != 0))
+                    if (!_fullyWritten || dataRead || (_delegate.needsWork() && _delegate.getNetInputBuffer().position() != 0))
                     {
                         _protocolEngine.notifyWork();
                     }
 
-                    // tell all consumer targets that it is okay to accept more
-                    _protocolEngine.setMessageAssignmentSuspended(false);
+                    if (_fullyWritten)
+                    {
+                        _protocolEngine.setMessageAssignmentSuspended(false);
+                    }
                 }
+                else
+                {
+                    _protocolEngine.notifyWork();
+                }
+
             }
             catch (IOException | ConnectionScopedRuntimeException e)
             {
@@ -260,6 +281,10 @@ public class NonBlockingConnection imple
                     _protocolEngine.notifyWork();
                 }
             }
+            finally
+            {
+                _protocolEngine.setIOThread(null);
+            }
         }
 
         final boolean closed = _closed.get();
@@ -390,7 +415,7 @@ public class NonBlockingConnection imple
 
     private boolean doWrite() throws IOException
     {
-        final boolean result = _delegate.doWrite(_buffers);
+        _fullyWritten = _delegate.doWrite(_buffers);
         while(!_buffers.isEmpty())
         {
             QpidByteBuffer buf = _buffers.peek();
@@ -401,7 +426,11 @@ public class NonBlockingConnection imple
             _buffers.poll();
             buf.dispose();
         }
-        return result;
+        if (_fullyWritten)
+        {
+            _usedOutboundMessageSpace.set(0);
+        }
+        return _fullyWritten;
 
     }
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java?rev=1712932&r1=1712931&r2=1712932&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java Fri Nov  6 10:35:34 2015
@@ -42,9 +42,11 @@ public interface ProtocolEngine extends
 
    // Called when the NetworkEngine has not written data for the specified period of time (will trigger a
    // heartbeat)
+   @Override
    void writerIdle();
 
    // Called when the NetworkEngine has not read data for the specified period of time (will close the connection)
+   @Override
    void readerIdle();
 
    Subject getSubject();
@@ -73,4 +75,5 @@ public interface ProtocolEngine extends
 
    void received(QpidByteBuffer msg);
 
+   void setIOThread(Thread ioThread);
 }

Copied: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java (from r1712787, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java?p2=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java&p1=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java&r1=1712787&r2=1712932&rev=1712932&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java Fri Nov  6 10:35:34 2015
@@ -1,5 +1,4 @@
-package org.apache.qpid.server.plugin;/*
- *
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,28 +15,13 @@ package org.apache.qpid.server.plugin;/*
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- *
  */
 
-import org.apache.qpid.server.transport.ProtocolEngine;
-import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.Protocol;
-import org.apache.qpid.server.model.Transport;
-import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.transport.network.AggregateTicker;
+package org.apache.qpid.server.transport;
+
 import org.apache.qpid.transport.network.NetworkConnection;
 
-public interface ProtocolEngineCreator extends Pluggable
+public interface ServerNetworkConnection extends NetworkConnection
 {
-    Protocol getVersion();
-    byte[] getHeaderIdentifier();
-    ProtocolEngine newProtocolEngine(Broker<?> broker,
-                                     NetworkConnection network,
-                                     AmqpPort<?> port,
-                                     Transport transport,
-                                     long id,
-                                     final AggregateTicker aggregateTicker);
-
-    byte[] getSuggestedAlternativeHeader();
+    void reserveOutboundMessageSpace(long size);
 }
-

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java?rev=1712932&r1=1712931&r2=1712932&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java Fri Nov  6 10:35:34 2015
@@ -38,6 +38,9 @@ import javax.net.ssl.SSLSocket;
 import javax.net.ssl.TrustManagerFactory;
 import javax.xml.bind.DatatypeConverter;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.qpid.server.model.Protocol;
 import org.apache.qpid.server.model.Transport;
 import org.apache.qpid.server.model.port.AmqpPort;
@@ -45,6 +48,7 @@ import org.apache.qpid.test.utils.QpidTe
 
 public class TCPandSSLTransportTest extends QpidTestCase
 {
+    private static final Logger LOGGER = LoggerFactory.getLogger(TCPandSSLTransportTest.class);
 
     public void testNoSSLv3SupportOnSSLOnlyPort() throws Exception
     {
@@ -82,9 +86,8 @@ public class TCPandSSLTransportTest exte
         }
         catch(SSLHandshakeException e)
         {
-            // pass
+            LOGGER.error("Should be able to connect using TLSv1.1", e);
             fail("Should be able to connect using TLSv1.1");
-
         }
     }
 
@@ -113,6 +116,7 @@ public class TCPandSSLTransportTest exte
         when(port.getNumberOfSelectors()).thenReturn(1);
         when(port.getSSLContext()).thenReturn(sslContext);
         when(port.getContextValue(Long.class, AmqpPort.PORT_AMQP_THREAD_POOL_KEEP_ALIVE_TIMEOUT)).thenReturn(1l);
+        when(port.getContextValue(Long.class, AmqpPort.PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE)).thenReturn(AmqpPort.DEFAULT_PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE);
 
         TCPandSSLTransport transport = new TCPandSSLTransport(new HashSet<>(Arrays.asList(transports)),
                                                               port,

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java?rev=1712932&r1=1712931&r2=1712932&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java Fri Nov  6 10:35:34 2015
@@ -47,6 +47,7 @@ import org.apache.qpid.server.transport.
 import org.apache.qpid.server.transport.ProtocolEngine;
 import org.apache.qpid.server.logging.messages.ConnectionMessages;
 import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.transport.ServerNetworkConnection;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
@@ -54,7 +55,6 @@ import org.apache.qpid.transport.ByteBuf
 import org.apache.qpid.transport.ConnectionDelegate;
 import org.apache.qpid.transport.Constant;
 import org.apache.qpid.transport.network.AggregateTicker;
-import org.apache.qpid.transport.network.NetworkConnection;
 
 
 public class AMQPConnection_0_10 extends AbstractAMQPConnection<AMQPConnection_0_10>
@@ -63,17 +63,18 @@ public class AMQPConnection_0_10 extends
     private final ServerInputHandler _inputHandler;
 
 
-    private final NetworkConnection _network;
+    private final ServerNetworkConnection _network;
     private final ServerConnection _connection;
 
     private volatile boolean _transportBlockedForWriting;
+
     private final AtomicBoolean _stateChanged = new AtomicBoolean();
     private final AtomicReference<Action<ProtocolEngine>> _workListener = new AtomicReference<>();
     private ServerDisassembler _disassembler;
 
 
     public AMQPConnection_0_10(final Broker<?> broker,
-                               NetworkConnection network,
+                               ServerNetworkConnection network,
                                final AmqpPort<?> port,
                                final Transport transport,
                                final long id,
@@ -122,7 +123,6 @@ public class AMQPConnection_0_10 extends
         }, getAccessControllerContext());
     }
 
-
     private ByteBufferSender wrapSender(final ByteBufferSender sender)
     {
         return new ByteBufferSender()
@@ -259,7 +259,10 @@ public class AMQPConnection_0_10 extends
     @Override
     public void processPending()
     {
-        _connection.processPending();
+        if (isIOThread())
+        {
+            _connection.processPending();
+        }
     }
 
     @Override

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1712932&r1=1712931&r2=1712932&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Fri Nov  6 10:35:34 2015
@@ -112,7 +112,7 @@ public class ConsumerTarget_0_10 extends
     }
 
     @Override
-    public boolean doIsSuspended()
+    public boolean isFlowSuspended()
     {
         return getState()!=State.ACTIVE || _deleted.get() || _session.isClosing() || _session.getAMQPConnection().isConnectionStopped(); // TODO check for Session suspension
     }
@@ -385,7 +385,7 @@ public class ConsumerTarget_0_10 extends
 
     public void flushCreditState(boolean strict)
     {
-        if(strict || !isSuspended() || _deferredMessageCredit >= 200
+        if(strict || !isFlowSuspended() || _deferredMessageCredit >= 200
           || !(_creditManager instanceof WindowCreditManager)
           || ((WindowCreditManager)_creditManager).getMessageCreditLimit() < 400 )
         {

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java?rev=1712932&r1=1712931&r2=1712932&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java Fri Nov  6 10:35:34 2015
@@ -27,8 +27,8 @@ import org.apache.qpid.server.model.Tran
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.plugin.PluggableService;
 import org.apache.qpid.server.plugin.ProtocolEngineCreator;
+import org.apache.qpid.server.transport.ServerNetworkConnection;
 import org.apache.qpid.transport.network.AggregateTicker;
-import org.apache.qpid.transport.network.NetworkConnection;
 
 @PluggableService
 public class ProtocolEngineCreator_0_10 implements ProtocolEngineCreator
@@ -62,7 +62,7 @@ public class ProtocolEngineCreator_0_10
     }
 
     public ProtocolEngine newProtocolEngine(Broker<?> broker,
-                                            NetworkConnection network,
+                                            ServerNetworkConnection network,
                                             AmqpPort<?> port,
                                             Transport transport,
                                             long id, final AggregateTicker aggregateTicker)

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1712932&r1=1712931&r2=1712932&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Fri Nov  6 10:35:34 2015
@@ -1177,6 +1177,11 @@ public class ServerSession extends Sessi
     @Override
     public boolean processPending()
     {
+        if (!getAMQPConnection().isIOThread())
+        {
+            return false;
+        }
+
         boolean desiredBlockingState = _blocking.get();
         if (desiredBlockingState != _wireBlockingState)
         {

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1712932&r1=1712931&r2=1712932&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Fri Nov  6 10:35:34 2015
@@ -3750,6 +3750,10 @@ public class AMQChannel
     @Override
     public boolean processPending()
     {
+        if (!getAMQPConnection().isIOThread())
+        {
+            return false;
+        }
 
         boolean desiredBlockingState = _blocking.get();
         if (desiredBlockingState != _wireBlockingState)

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java?rev=1712932&r1=1712931&r2=1712932&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java Fri Nov  6 10:35:34 2015
@@ -73,7 +73,6 @@ import org.apache.qpid.server.logging.me
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.Consumer;
 import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.Transport;
 import org.apache.qpid.server.model.port.AmqpPort;
@@ -81,6 +80,7 @@ import org.apache.qpid.server.protocol.A
 import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
 import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
 import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.transport.ServerNetworkConnection;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
@@ -88,7 +88,6 @@ import org.apache.qpid.server.virtualhos
 import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.TransportException;
 import org.apache.qpid.transport.network.AggregateTicker;
-import org.apache.qpid.transport.network.NetworkConnection;
 
 public class AMQPConnection_0_8
         extends AbstractAMQPConnection<AMQPConnection_0_8>
@@ -154,7 +153,7 @@ public class AMQPConnection_0_8
     private int _maxFrameSize;
     private final AtomicBoolean _orderlyClose = new AtomicBoolean(false);
 
-    private final NetworkConnection _network;
+    private final ServerNetworkConnection _network;
     private final ByteBufferSender _sender;
 
     private volatile boolean _deferFlush;
@@ -179,7 +178,7 @@ public class AMQPConnection_0_8
     private volatile boolean _transportBlockedForWriting;
 
     public AMQPConnection_0_8(Broker<?> broker,
-                              NetworkConnection network,
+                              ServerNetworkConnection network,
                               AmqpPort<?> port,
                               Transport transport,
                               Protocol protocol,
@@ -1554,6 +1553,11 @@ public class AMQPConnection_0_8
     @Override
     public void processPending()
     {
+        if (!isIOThread())
+        {
+            return;
+        }
+
         List<? extends AMQSessionModel<?>> sessionsWithPending = new ArrayList<>(getSessionModels());
         while(!sessionsWithPending.isEmpty())
         {

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1712932&r1=1712931&r2=1712932&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Fri Nov  6 10:35:34 2015
@@ -364,7 +364,7 @@ public abstract class ConsumerTarget_0_8
     }
 
     @Override
-    public boolean doIsSuspended()
+    public boolean isFlowSuspended()
     {
         return getState()!=State.ACTIVE || _channel.isSuspended() || _deleted.get() || _channel.getAMQPConnection().isConnectionStopped();
     }

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java?rev=1712932&r1=1712931&r2=1712932&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java Fri Nov  6 10:35:34 2015
@@ -27,8 +27,8 @@ import org.apache.qpid.server.model.Tran
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.plugin.PluggableService;
 import org.apache.qpid.server.plugin.ProtocolEngineCreator;
+import org.apache.qpid.server.transport.ServerNetworkConnection;
 import org.apache.qpid.transport.network.AggregateTicker;
-import org.apache.qpid.transport.network.NetworkConnection;
 
 @PluggableService
 public class ProtocolEngineCreator_0_8 implements ProtocolEngineCreator
@@ -60,7 +60,7 @@ public class ProtocolEngineCreator_0_8 i
     }
 
     public ProtocolEngine newProtocolEngine(Broker<?> broker,
-                                            NetworkConnection network,
+                                            ServerNetworkConnection network,
                                             AmqpPort<?> port,
                                             Transport transport,
                                             long id, final AggregateTicker aggregateTicker)

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java?rev=1712932&r1=1712931&r2=1712932&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java Fri Nov  6 10:35:34 2015
@@ -27,8 +27,8 @@ import org.apache.qpid.server.model.Tran
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.plugin.PluggableService;
 import org.apache.qpid.server.plugin.ProtocolEngineCreator;
+import org.apache.qpid.server.transport.ServerNetworkConnection;
 import org.apache.qpid.transport.network.AggregateTicker;
-import org.apache.qpid.transport.network.NetworkConnection;
 
 @PluggableService
 public class ProtocolEngineCreator_0_9 implements ProtocolEngineCreator
@@ -60,7 +60,7 @@ public class ProtocolEngineCreator_0_9 i
     }
 
     public ProtocolEngine newProtocolEngine(Broker<?> broker,
-                                            NetworkConnection network,
+                                            ServerNetworkConnection network,
                                             AmqpPort<?> port,
                                             Transport transport,
                                             long id, final AggregateTicker aggregateTicker)

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java?rev=1712932&r1=1712931&r2=1712932&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java Fri Nov  6 10:35:34 2015
@@ -27,8 +27,8 @@ import org.apache.qpid.server.model.Tran
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.plugin.PluggableService;
 import org.apache.qpid.server.plugin.ProtocolEngineCreator;
+import org.apache.qpid.server.transport.ServerNetworkConnection;
 import org.apache.qpid.transport.network.AggregateTicker;
-import org.apache.qpid.transport.network.NetworkConnection;
 
 @PluggableService
 public class ProtocolEngineCreator_0_9_1 implements ProtocolEngineCreator
@@ -61,7 +61,7 @@ public class ProtocolEngineCreator_0_9_1
     }
 
     public ProtocolEngine newProtocolEngine(Broker<?> broker,
-                                            NetworkConnection network,
+                                            ServerNetworkConnection network,
                                             AmqpPort<?> port,
                                             Transport transport,
                                             long id, final AggregateTicker aggregateTicker)

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Test.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Test.java?rev=1712932&r1=1712931&r2=1712932&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Test.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Test.java Fri Nov  6 10:35:34 2015
@@ -56,12 +56,12 @@ import org.apache.qpid.server.security.a
 import org.apache.qpid.server.security.auth.AuthenticationResult;
 import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
 import org.apache.qpid.server.transport.AMQPConnection;
+import org.apache.qpid.server.transport.ServerNetworkConnection;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 import org.apache.qpid.server.virtualhost.VirtualHostPrincipal;
 import org.apache.qpid.test.utils.QpidTestCase;
 import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.network.AggregateTicker;
-import org.apache.qpid.transport.network.NetworkConnection;
 
 public class AMQPConnection_0_8Test extends QpidTestCase
 {
@@ -75,7 +75,7 @@ public class AMQPConnection_0_8Test exte
     private VirtualHostNode _virtualHostNode;
     private VirtualHostImpl _virtualHost;
     private AmqpPort _port;
-    private NetworkConnection _network;
+    private ServerNetworkConnection _network;
     private Transport _transport;
     private Protocol _protocol;
     private AggregateTicker _ticker;
@@ -151,7 +151,7 @@ public class AMQPConnection_0_8Test exte
 
         _sender = mock(ByteBufferSender.class);
 
-        _network = mock(NetworkConnection.class);
+        _network = mock(ServerNetworkConnection.class);
         when(_network.getSender()).thenReturn(_sender);
         when(_network.getLocalAddress()).thenReturn(new InetSocketAddress("localhost", 12345));
 

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java?rev=1712932&r1=1712931&r2=1712932&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java Fri Nov  6 10:35:34 2015
@@ -71,11 +71,11 @@ import org.apache.qpid.server.security.a
 import org.apache.qpid.server.security.auth.manager.ExternalAuthenticationManagerImpl;
 import org.apache.qpid.server.transport.NetworkConnectionScheduler;
 import org.apache.qpid.server.transport.NonBlockingConnection;
+import org.apache.qpid.server.transport.ServerNetworkConnection;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.network.AggregateTicker;
-import org.apache.qpid.transport.network.NetworkConnection;
 
 public class AMQPConnection_1_0 extends AbstractAMQPConnection<AMQPConnection_1_0>
         implements FrameOutputHandler
@@ -142,8 +142,7 @@ public class AMQPConnection_1_0 extends
 
     private State _state = State.A;
 
-
-    public AMQPConnection_1_0(final Broker<?> broker, final NetworkConnection network,
+    public AMQPConnection_1_0(final Broker<?> broker, final ServerNetworkConnection network,
                               AmqpPort<?> port, Transport transport, long id,
                               final AggregateTicker aggregateTicker,
                               final boolean useSASL)
@@ -173,7 +172,7 @@ public class AMQPConnection_1_0 extends
     }
 
     public static Connection_1_0 createConnection(final Broker<?> broker,
-                                                  final NetworkConnection network,
+                                                  final ServerNetworkConnection network,
                                                   final AmqpPort<?> port,
                                                   final Transport transport,
                                                   final long id,
@@ -279,7 +278,7 @@ public class AMQPConnection_1_0 extends
     }
 
     private static SaslServerProvider asSaslServerProvider(final SubjectCreator subjectCreator,
-                                                           final NetworkConnection network)
+                                                           final ServerNetworkConnection network)
     {
         return new SaslServerProvider()
         {
@@ -555,8 +554,10 @@ public class AMQPConnection_1_0 extends
     @Override
     public void processPending()
     {
-        _connection.processPending();
-
+        if (isIOThread())
+        {
+            _connection.processPending();
+        }
     }
 
     @Override

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1712932&r1=1712931&r2=1712932&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Fri Nov  6 10:35:34 2015
@@ -91,7 +91,7 @@ class ConsumerTarget_1_0 extends Abstrac
     }
 
     @Override
-    public boolean doIsSuspended()
+    public boolean isFlowSuspended()
     {
         return _link.getSession().getAMQPConnection().isConnectionStopped() || getState() != State.ACTIVE;
 
@@ -335,7 +335,7 @@ class ConsumerTarget_1_0 extends Abstrac
         synchronized(_link.getLock())
         {
             ProtocolEngine protocolEngine = getSession().getConnection().getAmqpConnection();
-            if(isSuspended() && getEndpoint() != null && !protocolEngine.isTransportBlockedForWriting())
+            if(isFlowSuspended() && getEndpoint() != null && !protocolEngine.isTransportBlockedForWriting())
             {
                 updateState(State.SUSPENDED, State.ACTIVE);
                 _transactionId = _link.getTransactionId();

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0.java?rev=1712932&r1=1712931&r2=1712932&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0.java Fri Nov  6 10:35:34 2015
@@ -36,8 +36,8 @@ import org.apache.qpid.server.plugin.Pro
 import org.apache.qpid.server.transport.ProtocolEngine;
 import org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManager;
 import org.apache.qpid.server.security.auth.manager.ExternalAuthenticationManagerImpl;
+import org.apache.qpid.server.transport.ServerNetworkConnection;
 import org.apache.qpid.transport.network.AggregateTicker;
-import org.apache.qpid.transport.network.NetworkConnection;
 
 @PluggableService
 public class ProtocolEngineCreator_1_0_0 implements ProtocolEngineCreator
@@ -71,7 +71,7 @@ public class ProtocolEngineCreator_1_0_0
     }
 
     public ProtocolEngine newProtocolEngine(Broker<?> broker,
-                                            NetworkConnection network,
+                                            ServerNetworkConnection network,
                                             AmqpPort<?> port,
                                             Transport transport,
                                             long id, final AggregateTicker aggregateTicker)

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java?rev=1712932&r1=1712931&r2=1712932&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java Fri Nov  6 10:35:34 2015
@@ -27,8 +27,8 @@ import org.apache.qpid.server.model.Tran
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.plugin.PluggableService;
 import org.apache.qpid.server.plugin.ProtocolEngineCreator;
+import org.apache.qpid.server.transport.ServerNetworkConnection;
 import org.apache.qpid.transport.network.AggregateTicker;
-import org.apache.qpid.transport.network.NetworkConnection;
 
 @PluggableService
 public class ProtocolEngineCreator_1_0_0_SASL implements ProtocolEngineCreator
@@ -60,7 +60,7 @@ public class ProtocolEngineCreator_1_0_0
     }
 
     public ProtocolEngine newProtocolEngine(Broker<?> broker,
-                                            NetworkConnection network,
+                                            ServerNetworkConnection network,
                                             AmqpPort<?> port,
                                             Transport transport,
                                             long id, final AggregateTicker aggregateTicker)

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1712932&r1=1712931&r2=1712932&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Fri Nov  6 10:35:34 2015
@@ -919,6 +919,11 @@ public class Session_1_0 implements Sess
     @Override
     public boolean processPending()
     {
+        if (!getAMQPConnection().isIOThread())
+        {
+            return false;
+        }
+
         boolean consumerListNeedsRefreshing;
         if(_consumersWithPendingWork.isEmpty())
         {

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java?rev=1712932&r1=1712931&r2=1712932&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java Fri Nov  6 10:35:34 2015
@@ -44,6 +44,7 @@ import org.apache.qpid.bytebuffer.QpidBy
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.model.*;
 import org.apache.qpid.server.transport.AMQPConnection;
+import org.apache.qpid.server.transport.ServerNetworkConnection;
 import org.apache.qpid.server.virtualhost.VirtualHostPrincipal;
 import org.mockito.ArgumentCaptor;
 import org.mockito.invocation.InvocationOnMock;
@@ -64,17 +65,15 @@ import org.apache.qpid.server.security.a
 import org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManager;
 import org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManagerFactory;
 import org.apache.qpid.server.security.auth.manager.ExternalAuthenticationManagerImpl;
-import org.apache.qpid.server.transport.NonBlockingConnection;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 import org.apache.qpid.test.utils.QpidTestCase;
 import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.network.AggregateTicker;
-import org.apache.qpid.transport.network.NetworkConnection;
 
 public class ProtocolEngine_1_0_0Test extends QpidTestCase
 {
     private AMQPConnection_1_0 _protocolEngine_1_0_0;
-    private NetworkConnection _networkConnection;
+    private ServerNetworkConnection _networkConnection;
     private Broker<?> _broker;
     private AmqpPort _port;
     private SubjectCreator _subjectCreator;
@@ -88,7 +87,7 @@ public class ProtocolEngine_1_0_0Test ex
     public void setUp() throws Exception
     {
         super.setUp();
-        _networkConnection = mock(NonBlockingConnection.class);
+        _networkConnection = mock(ServerNetworkConnection.class);
         _broker = mock(Broker.class);
         when(_broker.getModel()).thenReturn(BrokerModel.getInstance());
         final TaskExecutor taskExecutor = new TaskExecutorImpl();

Modified: qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java?rev=1712932&r1=1712931&r2=1712932&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java (original)
+++ qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java Fri Nov  6 10:35:34 2015
@@ -58,10 +58,10 @@ import org.apache.qpid.server.model.port
 import org.apache.qpid.server.transport.MultiVersionProtocolEngineFactory;
 import org.apache.qpid.server.transport.AcceptingTransport;
 import org.apache.qpid.server.transport.ProtocolEngine;
+import org.apache.qpid.server.transport.ServerNetworkConnection;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
 import org.apache.qpid.transport.ByteBufferSender;
-import org.apache.qpid.transport.network.NetworkConnection;
 import org.apache.qpid.transport.network.security.ssl.SSLUtil;
 
 class WebSocketProvider implements AcceptingTransport
@@ -289,7 +289,7 @@ class WebSocketProvider implements Accep
         }
     }
 
-    private class ConnectionWrapper implements NetworkConnection, ByteBufferSender
+    private class ConnectionWrapper implements ServerNetworkConnection, ByteBufferSender
     {
         private final WebSocket.Connection _connection;
         private final SocketAddress _localAddress;
@@ -393,6 +393,12 @@ class WebSocketProvider implements Accep
             return _maxWriteIdle;
         }
 
+        @Override
+        public void reserveOutboundMessageSpace(final long size)
+        {
+            // TODO
+        }
+
         void setPeerCertificate(final Certificate certificate)
         {
             _certificate = certificate;



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org