You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2014/12/11 18:14:35 UTC

svn commit: r1644704 - in /qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java: broker-core/src/main/java/org/apache/qpid/server/flow/ broker-core/src/main/java/org/apache/qpid/server/protocol/ broker-core/src/test/java/org/apache/qpid/server/consumer/ bro...

Author: kwall
Date: Thu Dec 11 17:14:34 2014
New Revision: 1644704

URL: http://svn.apache.org/r1644704
Log:
Extend credit managers to be aware of transport backpressue

Added:
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageOnlyCreditManager.java
      - copied, changed from r1644644, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java
      - copied, changed from r1644644, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/LimitlessCreditManager.java
      - copied, changed from r1644644, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java
Removed:
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
Modified:
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java?rev=1644704&r1=1644703&r2=1644704&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java Thu Dec 11 17:14:34 2014
@@ -24,10 +24,6 @@ package org.apache.qpid.server.flow;
 
 public interface FlowCreditManager
 {
-    long getMessageCredit();
-
-    long getBytesCredit();
-
     public static interface FlowCreditManagerListener
     {
         void creditStateChanged(boolean hasCredit);

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1644704&r1=1644703&r2=1644704&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java Thu Dec 11 17:14:34 2014
@@ -113,4 +113,6 @@ public interface AMQSessionModel<T exten
      * @return the time of the last activity or 0 if not in a transaction
      */
     long getTransactionUpdateTime();
+
+    void transportStateChanged();
 }

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java?rev=1644704&r1=1644703&r2=1644704&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java Thu Dec 11 17:14:34 2014
@@ -157,6 +157,18 @@ public class MultiVersionProtocolEngine
         return _delegate.getSubject();
     }
 
+    @Override
+    public boolean isTransportBlockedForWriting()
+    {
+        return _delegate.isTransportBlockedForWriting();
+    }
+
+    @Override
+    public void setTransportBlockedForWriting(final boolean blocked)
+    {
+        _delegate.setTransportBlockedForWriting(blocked);
+    }
+
     private static final int MINIMUM_REQUIRED_HEADER_BYTES = 8;
 
     public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
@@ -268,6 +280,17 @@ public class MultiVersionProtocolEngine
         {
             return new Subject();
         }
+
+        @Override
+        public boolean isTransportBlockedForWriting()
+        {
+            return false;
+        }
+
+        @Override
+        public void setTransportBlockedForWriting(final boolean blocked)
+        {
+        }
     }
 
     private class SelfDelegateProtocolEngine implements ServerProtocolEngine
@@ -408,6 +431,17 @@ public class MultiVersionProtocolEngine
             return _delegate.getSubject();
         }
 
+        @Override
+        public boolean isTransportBlockedForWriting()
+        {
+            return false;
+        }
+
+        @Override
+        public void setTransportBlockedForWriting(final boolean blocked)
+        {
+        }
+
         public void exception(Throwable t)
         {
             _logger.error("Error establishing session", t);

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java?rev=1644704&r1=1644703&r2=1644704&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java Thu Dec 11 17:14:34 2014
@@ -456,6 +456,12 @@ public class MockConsumer implements Con
         {
             return 0;
         }
+
+        @Override
+        public void transportStateChanged()
+        {
+
+        }
     }
 
     private static class MockConnectionModel implements AMQConnectionModel
@@ -663,5 +669,7 @@ public class MockConsumer implements Con
         {
 
         }
+
+
     }
 }

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1644704&r1=1644703&r2=1644704&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Thu Dec 11 17:14:34 2014
@@ -158,6 +158,10 @@ public class ConsumerTarget_0_10 extends
         return _name;
     }
 
+    public void transportStateChanged()
+    {
+        _creditManager.restoreCredit(0, 0);
+    }
 
     public static class AddMessageDispositionListenerAction implements Runnable
     {
@@ -555,10 +559,10 @@ public class ConsumerTarget_0_10 extends
         switch(flowMode)
         {
             case CREDIT:
-                _creditManager = new CreditCreditManager(0l,0l);
+                _creditManager = new CreditCreditManager(0l, 0l, _session.getConnection().getProtocolEngine());
                 break;
             case WINDOW:
-                _creditManager = new WindowCreditManager(0l,0l);
+                _creditManager = new WindowCreditManager(0l, 0l, _session.getConnection().getProtocolEngine());
                 break;
             default:
                 // this should never happen, as 0-10 is finalised and so the enum should never change

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java?rev=1644704&r1=1644703&r2=1644704&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java Thu Dec 11 17:14:34 2014
@@ -21,48 +21,27 @@
 package org.apache.qpid.server.protocol.v0_10;
 
 
+import org.apache.qpid.protocol.ServerProtocolEngine;
 import org.apache.qpid.server.flow.AbstractFlowCreditManager;
 
 public class CreditCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10
 {
+    private final ServerProtocolEngine _serverProtocolEngine;
     private volatile long _bytesCredit;
     private volatile long _messageCredit;
 
-    public CreditCreditManager(long bytesCredit, long messageCredit)
+    public CreditCreditManager(long bytesCredit, long messageCredit, final ServerProtocolEngine serverProtocolEngine)
     {
+        _serverProtocolEngine = serverProtocolEngine;
         _bytesCredit = bytesCredit;
         _messageCredit = messageCredit;
         setSuspended(!hasCredit());
 
     }
 
-
-    public synchronized void setCreditLimits(final long bytesCredit, final long messageCredit)
-    {
-        _bytesCredit = bytesCredit;
-        _messageCredit = messageCredit;
-
-        setSuspended(!hasCredit());
-
-    }
-
-
-    public long getMessageCredit()
-    {
-         return _messageCredit == -1L
-                    ? Long.MAX_VALUE
-                    : _messageCredit;
-    }
-
-    public long getBytesCredit()
-    {
-        return _bytesCredit == -1L
-                    ? Long.MAX_VALUE
-                    : _bytesCredit;
-    }
-
     public synchronized void restoreCredit(final long messageCredit, final long bytesCredit)
     {
+        setSuspended(!hasCredit());
     }
 
 
@@ -107,12 +86,17 @@ public class CreditCreditManager extends
     public synchronized boolean hasCredit()
     {
         // Note !=, if credit is < 0 that indicates infinite credit
-        return (_bytesCredit != 0L  && _messageCredit != 0L);
+        return (_bytesCredit != 0L  && _messageCredit != 0L && !_serverProtocolEngine.isTransportBlockedForWriting());
     }
 
     public synchronized boolean useCreditForMessage(long msgSize)
     {
-        if(_messageCredit >= 0L)
+        if (_serverProtocolEngine.isTransportBlockedForWriting())
+        {
+            setSuspended(true);
+            return false;
+        }
+        else if(_messageCredit >= 0L)
         {
             if(_messageCredit > 0)
             {

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java?rev=1644704&r1=1644703&r2=1644704&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java Thu Dec 11 17:14:34 2014
@@ -86,7 +86,10 @@ public class ProtocolEngineCreator_0_10
         conn.setRemoteAddress(network.getRemoteAddress());
         conn.setLocalAddress(network.getLocalAddress());
 
-        return new ProtocolEngine_0_10( conn, network);
+        ProtocolEngine_0_10 protocolEngine = new ProtocolEngine_0_10(conn, network);
+        conn.setProtocolEngine(protocolEngine);
+
+        return protocolEngine;
     }
 
 

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java?rev=1644704&r1=1644703&r2=1644704&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java Thu Dec 11 17:14:34 2014
@@ -54,6 +54,7 @@ public class ProtocolEngine_0_10  extend
     private long _createTime = System.currentTimeMillis();
     private long _lastReadTime = _createTime;
     private long _lastWriteTime = _createTime;
+    private volatile boolean _transportBlockedForWriting;
 
     public ProtocolEngine_0_10(ServerConnection conn,
                                NetworkConnection network)
@@ -249,4 +250,18 @@ public class ProtocolEngine_0_10  extend
     {
         return _connection.getAuthorizedSubject();
     }
+
+    @Override
+    public boolean isTransportBlockedForWriting()
+    {
+        return _transportBlockedForWriting;
+    }
+
+    @Override
+    public void setTransportBlockedForWriting(final boolean blocked)
+    {
+        _transportBlockedForWriting = blocked;
+        _connection.transportStateChanged();
+    }
+
 }

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1644704&r1=1644703&r2=1644704&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Thu Dec 11 17:14:34 2014
@@ -37,6 +37,7 @@ import java.util.concurrent.atomic.Atomi
 import javax.security.auth.Subject;
 
 import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.ServerProtocolEngine;
 import org.apache.qpid.server.connection.ConnectionPrincipal;
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.LogSubject;
@@ -90,6 +91,8 @@ public class ServerConnection extends Co
     private int _messageCompressionThreshold;
     private int _maxMessageSize;
 
+    private ServerProtocolEngine _serverProtocolEngine;
+
     public ServerConnection(final long connectionId,
                             Broker<?> broker,
                             final AmqpPort<?> port,
@@ -189,6 +192,16 @@ public class ServerConnection extends Co
         super.setConnectionDelegate(delegate);
     }
 
+    public ServerProtocolEngine getProtocolEngine()
+    {
+        return _serverProtocolEngine;
+    }
+
+    public void setProtocolEngine(final ServerProtocolEngine serverProtocolEngine)
+    {
+        _serverProtocolEngine = serverProtocolEngine;
+    }
+
     public VirtualHostImpl<?,?,?> getVirtualHost()
     {
         return _virtualHost;
@@ -664,4 +677,12 @@ public class ServerConnection extends Co
     {
         return _maxMessageSize;
     }
+
+    public void transportStateChanged()
+    {
+        for (AMQSessionModel ssn : getSessionModels())
+        {
+            ssn.transportStateChanged();
+        }
+    }
 }

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1644704&r1=1644703&r2=1644704&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Thu Dec 11 17:14:34 2014
@@ -56,6 +56,7 @@ import org.apache.qpid.server.Transactio
 import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
 import org.apache.qpid.server.connection.SessionPrincipal;
 import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.logging.LogMessage;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.messages.ChannelMessages;
@@ -874,6 +875,15 @@ public class ServerSession extends Sessi
     }
 
     @Override
+    public void transportStateChanged()
+    {
+        for(ConsumerTarget_0_10 consumerTarget : getSubscriptions())
+        {
+            consumerTarget.transportStateChanged();
+        }
+    }
+
+    @Override
     public Object getConnectionReference()
     {
         return getConnection().getReference();

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1644704&r1=1644703&r2=1644704&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Thu Dec 11 17:14:34 2014
@@ -35,6 +35,7 @@ import org.apache.log4j.Logger;
 
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.ServerProtocolEngine;
 import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.exchange.ExchangeImpl;
 import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
@@ -244,8 +245,8 @@ public class ServerSessionDelegate exten
                 }
                 else
                 {
-
-                    FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L);
+                    ServerProtocolEngine serverProtocolEngine = getServerConnection(session).getProtocolEngine();
+                    FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L, serverProtocolEngine);
 
                     FilterManager filterManager = null;
                     try

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java?rev=1644704&r1=1644703&r2=1644704&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java Thu Dec 11 17:14:34 2014
@@ -21,11 +21,14 @@
 package org.apache.qpid.server.protocol.v0_10;
 
 import org.apache.log4j.Logger;
+
+import org.apache.qpid.protocol.ServerProtocolEngine;
 import org.apache.qpid.server.flow.AbstractFlowCreditManager;
 
 public class WindowCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10
 {
     private static final Logger LOGGER = Logger.getLogger(WindowCreditManager.class);
+    private final ServerProtocolEngine _serverProtocolEngine;
 
     private volatile long _bytesCreditLimit;
     private volatile long _messageCreditLimit;
@@ -33,39 +36,22 @@ public class WindowCreditManager extends
     private volatile long _bytesUsed;
     private volatile long _messageUsed;
 
-     public WindowCreditManager()
-     {
-         this(0L, 0L);
-     }
-
-    public WindowCreditManager(long bytesCreditLimit, long messageCreditLimit)
+    public WindowCreditManager(long bytesCreditLimit,
+                               long messageCreditLimit,
+                               ServerProtocolEngine serverProtocolEngine)
     {
+        _serverProtocolEngine = serverProtocolEngine;
         _bytesCreditLimit = bytesCreditLimit;
         _messageCreditLimit = messageCreditLimit;
         setSuspended(!hasCredit());
 
     }
 
-    public long getBytesCreditLimit()
-    {
-        return _bytesCreditLimit;
-    }
-
     public long getMessageCreditLimit()
     {
         return _messageCreditLimit;
     }
 
-    public synchronized void setCreditLimits(final long bytesCreditLimit, final long messageCreditLimit)
-    {
-        _bytesCreditLimit = bytesCreditLimit;
-        _messageCreditLimit = messageCreditLimit;
-
-        setSuspended(!hasCredit());
-
-    }
-
-
     public long getMessageCredit()
     {
          return _messageCreditLimit == -1L
@@ -121,12 +107,18 @@ public class WindowCreditManager extends
     public synchronized boolean hasCredit()
     {
         return (_bytesCreditLimit < 0L || _bytesCreditLimit > _bytesUsed)
-                && (_messageCreditLimit < 0L || _messageCreditLimit > _messageUsed);
+                && (_messageCreditLimit < 0L || _messageCreditLimit > _messageUsed)
+                && !_serverProtocolEngine.isTransportBlockedForWriting();
     }
 
     public synchronized boolean useCreditForMessage(final long msgSize)
     {
-        if(_messageCreditLimit >= 0L)
+        if (_serverProtocolEngine.isTransportBlockedForWriting())
+        {
+            setSuspended(true);
+            return false;
+        }
+        else if(_messageCreditLimit >= 0L)
         {
             if(_messageUsed < _messageCreditLimit)
             {

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java?rev=1644704&r1=1644703&r2=1644704&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java Thu Dec 11 17:14:34 2014
@@ -20,17 +20,25 @@
  */
 package org.apache.qpid.server.protocol.v0_10;
 
-import org.apache.qpid.server.protocol.v0_10.WindowCreditManager;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.qpid.protocol.ServerProtocolEngine;
 import org.apache.qpid.test.utils.QpidTestCase;
 
 public class WindowCreditManagerTest extends QpidTestCase
 {
     private WindowCreditManager _creditManager;
+    private ServerProtocolEngine _protocolEngine;
 
     protected void setUp() throws Exception
     {
         super.setUp();
-        _creditManager = new WindowCreditManager();
+
+        _protocolEngine = mock(ServerProtocolEngine.class);
+        when(_protocolEngine.isTransportBlockedForWriting()).thenReturn(false);
+
+        _creditManager = new WindowCreditManager(0l, 0l, _protocolEngine);
     }
 
     /**

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1644704&r1=1644703&r2=1644704&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Thu Dec 11 17:14:34 2014
@@ -66,8 +66,6 @@ import org.apache.qpid.server.filter.Fil
 import org.apache.qpid.server.filter.MessageFilter;
 import org.apache.qpid.server.filter.SimpleFilterManager;
 import org.apache.qpid.server.flow.FlowCreditManager;
-import org.apache.qpid.server.flow.MessageOnlyCreditManager;
-import org.apache.qpid.server.flow.Pre0_10CreditManager;
 import org.apache.qpid.server.logging.LogMessage;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.messages.ChannelMessages;
@@ -131,7 +129,8 @@ public class AMQChannel
     private final int _channelId;
 
 
-    private final Pre0_10CreditManager _creditManager = new Pre0_10CreditManager(0l,0l);
+    private final Pre0_10CreditManager _creditManager;
+    private final FlowCreditManager _noAckCreditManager;
 
     /**
      * The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that
@@ -213,6 +212,9 @@ public class AMQChannel
 
     public AMQChannel(AMQProtocolEngine connection, int channelId, final MessageStore messageStore)
     {
+        _creditManager = new Pre0_10CreditManager(0l,0l, connection);
+        _noAckCreditManager = new NoAckCreditManager(connection);
+
         _connection = connection;
         _channelId = channelId;
 
@@ -699,7 +701,7 @@ public class AMQChannel
 
         if(filters != null && Boolean.TRUE.equals(filters.get(AMQPFilterTypes.NO_CONSUME.getValue())))
         {
-            target = ConsumerTarget_0_8.createBrowserTarget(this, tag, filters, _creditManager);
+            target = ConsumerTarget_0_8.createBrowserTarget(this, tag, filters, _noAckCreditManager);
         }
         else if(acks)
         {
@@ -709,7 +711,7 @@ public class AMQChannel
         }
         else
         {
-            target = ConsumerTarget_0_8.createNoAckTarget(this, tag, filters, _creditManager);
+            target = ConsumerTarget_0_8.createNoAckTarget(this, tag, filters, _noAckCreditManager);
             options.add(ConsumerImpl.Option.ACQUIRES);
             options.add(ConsumerImpl.Option.SEES_REQUEUES);
         }
@@ -1644,6 +1646,7 @@ public class AMQChannel
         }
     }
 
+
     public synchronized void block(AMQQueue queue)
     {
         if(_blockingEntities.add(queue))
@@ -1672,6 +1675,13 @@ public class AMQChannel
     }
 
     @Override
+    public void transportStateChanged()
+    {
+        _creditManager.restoreCredit(0, 0);
+        _noAckCreditManager.restoreCredit(0, 0);
+    }
+
+    @Override
     public Object getConnectionReference()
     {
         return getConnection().getReference();

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1644704&r1=1644703&r2=1644704&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Thu Dec 11 17:14:34 2014
@@ -188,6 +188,7 @@ public class AMQProtocolEngine implement
     private int _currentMethodId;
     private int _binaryDataLimit;
     private long _maxMessageSize;
+    private volatile boolean _transportBlockedForWriting;
 
     public AMQProtocolEngine(Broker<?> broker,
                              final NetworkConnection network,
@@ -250,6 +251,22 @@ public class AMQProtocolEngine implement
         return _authorizedSubject;
     }
 
+    @Override
+    public boolean isTransportBlockedForWriting()
+    {
+        return _transportBlockedForWriting;
+    }
+
+    @Override
+    public void setTransportBlockedForWriting(final boolean blocked)
+    {
+        _transportBlockedForWriting = blocked;
+        for(AMQChannel channel : _channelMap.values())
+        {
+            channel.transportStateChanged();
+        }
+    }
+
     public void setNetworkConnection(NetworkConnection network)
     {
         setNetworkConnection(network, network.getSender());

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1644704&r1=1644703&r2=1644704&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Thu Dec 11 17:14:34 2014
@@ -136,12 +136,6 @@ public abstract class ConsumerTarget_0_8
 
         }
 
-        @Override
-        public boolean allocateCredit(ServerMessage msg)
-        {
-            return true;
-        }
-
     }
 
     public static ConsumerTarget_0_8 createNoAckTarget(AMQChannel channel,
@@ -215,12 +209,6 @@ public abstract class ConsumerTarget_0_8
 
         }
 
-        @Override
-        public boolean allocateCredit(ServerMessage msg)
-        {
-            return true;
-        }
-
         private static final ServerTransaction.Action NOOP =
                 new ServerTransaction.Action()
                 {
@@ -250,11 +238,6 @@ public abstract class ConsumerTarget_0_8
             super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
         }
 
-        public boolean allocateCredit(ServerMessage msg)
-        {
-            return getCreditManager().useCreditForMessage(msg.getSize());
-        }
-
     }
 
 

Copied: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageOnlyCreditManager.java (from r1644644, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageOnlyCreditManager.java?p2=qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageOnlyCreditManager.java&p1=qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java&r1=1644644&r2=1644704&rev=1644704&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageOnlyCreditManager.java Thu Dec 11 17:14:34 2014
@@ -18,10 +18,13 @@
 * under the License.
 *
 */
-package org.apache.qpid.server.flow;
+package org.apache.qpid.server.protocol.v0_8;
 
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.qpid.server.flow.AbstractFlowCreditManager;
+import org.apache.qpid.server.flow.FlowCreditManager;
+
 public class MessageOnlyCreditManager extends AbstractFlowCreditManager implements FlowCreditManager
 {
     private final AtomicLong _messageCredit;
@@ -31,16 +34,6 @@ public class MessageOnlyCreditManager ex
         _messageCredit = new AtomicLong(initialCredit);
     }
 
-    public long getMessageCredit()
-    {
-        return _messageCredit.get();
-    }
-
-    public long getBytesCredit()
-    {
-        return -1L;
-    }
-
     public void restoreCredit(long messageCredit, long bytesCredit)
     {
         _messageCredit.addAndGet(messageCredit);
@@ -48,12 +41,6 @@ public class MessageOnlyCreditManager ex
 
     }
 
-    public void removeAllCredit()
-    {
-        setSuspended(true);
-        _messageCredit.set(0L);
-    }
-
     public boolean hasCredit()
     {
         return _messageCredit.get() > 0L;

Added: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java?rev=1644704&view=auto
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java (added)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java Thu Dec 11 17:14:34 2014
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.protocol.v0_8;
+
+import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.flow.AbstractFlowCreditManager;
+
+public class NoAckCreditManager extends AbstractFlowCreditManager
+{
+    private final ServerProtocolEngine _serverProtocolEngine;
+
+    public NoAckCreditManager(ServerProtocolEngine serverProtocolEngine)
+    {
+        _serverProtocolEngine = serverProtocolEngine;
+    }
+
+    @Override
+    public void restoreCredit(final long messageCredit, final long bytesCredit)
+    {
+        setSuspended(!hasCredit());
+    }
+
+    @Override
+    public boolean hasCredit()
+    {
+        return !_serverProtocolEngine.isTransportBlockedForWriting();
+    }
+
+    @Override
+    public boolean useCreditForMessage(final long msgSize)
+    {
+        if (!hasCredit())
+        {
+            setSuspended(true);
+            return false;
+        }
+        return true;
+    }
+}

Copied: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java (from r1644644, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java?p2=qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java&p1=qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java&r1=1644644&r2=1644704&rev=1644704&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java Thu Dec 11 17:14:34 2014
@@ -18,20 +18,28 @@
 * under the License.
 *
 */
-package org.apache.qpid.server.flow;
+package org.apache.qpid.server.protocol.v0_8;
 
 
+import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.flow.AbstractFlowCreditManager;
+import org.apache.qpid.server.flow.FlowCreditManager;
+
 public class Pre0_10CreditManager extends AbstractFlowCreditManager implements FlowCreditManager
 {
 
+    private final ServerProtocolEngine _protocolEngine;
     private volatile long _bytesCreditLimit;
     private volatile long _messageCreditLimit;
 
     private volatile long _bytesCredit;
     private volatile long _messageCredit;
 
-    public Pre0_10CreditManager(long bytesCreditLimit, long messageCreditLimit)
+    public Pre0_10CreditManager(long bytesCreditLimit,
+                                long messageCreditLimit,
+                                ServerProtocolEngine protocolEngine)
     {
+        _protocolEngine = protocolEngine;
         _bytesCreditLimit = bytesCreditLimit;
         _messageCreditLimit = messageCreditLimit;
         _bytesCredit = bytesCreditLimit;
@@ -39,6 +47,7 @@ public class Pre0_10CreditManager extend
     }
 
 
+
     public synchronized void setCreditLimits(final long bytesCreditLimit, final long messageCreditLimit)
     {
         long bytesCreditChange = bytesCreditLimit - _bytesCreditLimit;
@@ -80,16 +89,6 @@ public class Pre0_10CreditManager extend
     }
 
 
-    public long getMessageCredit()
-    {
-        return _messageCredit;
-    }
-
-    public long getBytesCredit()
-    {
-        return _bytesCredit;
-    }
-
     public synchronized void restoreCredit(final long messageCredit, final long bytesCredit)
     {
         final long messageCreditLimit = _messageCreditLimit;
@@ -119,22 +118,21 @@ public class Pre0_10CreditManager extend
 
     }
 
-    public synchronized void removeAllCredit()
-    {
-        _bytesCredit = 0L;
-        _messageCredit = 0L;
-        setSuspended(!hasCredit());
-    }
-
     public synchronized boolean hasCredit()
     {
         return (_bytesCreditLimit == 0L || _bytesCredit > 0)
-                && (_messageCreditLimit == 0L || _messageCredit > 0);
+                && (_messageCreditLimit == 0L || _messageCredit > 0)
+                && !_protocolEngine.isTransportBlockedForWriting();
     }
 
     public synchronized boolean useCreditForMessage(final long msgSize)
     {
-        if(_messageCreditLimit != 0L)
+        if (_protocolEngine.isTransportBlockedForWriting())
+        {
+            setSuspended(true);
+            return false;
+        }
+        else if(_messageCreditLimit != 0L)
         {
             if(_messageCredit != 0L)
             {

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java?rev=1644704&r1=1644703&r2=1644704&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java Thu Dec 11 17:14:34 2014
@@ -31,8 +31,6 @@ import org.apache.qpid.framing.BasicCont
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.MessagePublishInfo;
 import org.apache.qpid.server.consumer.ConsumerImpl;
-import org.apache.qpid.server.flow.LimitlessCreditManager;
-import org.apache.qpid.server.flow.Pre0_10CreditManager;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.store.StoredMessage;
@@ -328,7 +326,7 @@ public class AckTest extends QpidTestCas
     public void testMessageDequeueRestoresCreditTest() throws Exception
     {
         // Send 10 messages
-        Pre0_10CreditManager creditManager = new Pre0_10CreditManager(0l, 1);
+        Pre0_10CreditManager creditManager = new Pre0_10CreditManager(0l, 1, _protocolEngine);
 
 
         _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel, DEFAULT_CONSUMER_TAG, null, creditManager);

Copied: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/LimitlessCreditManager.java (from r1644644, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/LimitlessCreditManager.java?p2=qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/LimitlessCreditManager.java&p1=qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java&r1=1644644&r2=1644704&rev=1644704&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/LimitlessCreditManager.java Thu Dec 11 17:14:34 2014
@@ -18,20 +18,14 @@
 * under the License.
 *
 */
-package org.apache.qpid.server.flow;
+package org.apache.qpid.server.protocol.v0_8;
 
 
+import org.apache.qpid.server.flow.AbstractFlowCreditManager;
+import org.apache.qpid.server.flow.FlowCreditManager;
+
 public class LimitlessCreditManager extends AbstractFlowCreditManager implements FlowCreditManager
 {
-    public long getMessageCredit()
-    {
-        return -1L;
-    }
-
-    public long getBytesCredit()
-    {
-        return -1L;
-    }
 
     public void restoreCredit(long messageCredit, long bytesCredit)
     {

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1644704&r1=1644703&r2=1644704&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java Thu Dec 11 17:14:34 2014
@@ -44,6 +44,7 @@ import org.apache.qpid.amqp_1_0.type.tra
 import org.apache.qpid.amqp_1_0.type.transport.End;
 import org.apache.qpid.amqp_1_0.type.transport.Error;
 import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.ServerProtocolEngine;
 import org.apache.qpid.server.connection.ConnectionPrincipal;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.model.Broker;
@@ -64,6 +65,7 @@ public class Connection_1_0 implements C
     private final AmqpPort<?> _port;
     private final Broker<?> _broker;
     private final SubjectCreator _subjectCreator;
+    private final ServerProtocolEngine _protocolEngine;
     private VirtualHostImpl _vhost;
     private final Transport _transport;
     private final ConnectionEndpoint _conn;
@@ -101,12 +103,16 @@ public class Connection_1_0 implements C
     private boolean _closedOnOpen;
 
 
+
     public Connection_1_0(Broker<?> broker,
                           ConnectionEndpoint conn,
                           long connectionId,
                           AmqpPort<?> port,
-                          Transport transport, final SubjectCreator subjectCreator)
+                          Transport transport,
+                          final SubjectCreator subjectCreator,
+                          final ServerProtocolEngine protocolEngine)
     {
+        _protocolEngine = protocolEngine;
         _broker = broker;
         _port = port;
         _transport = transport;
@@ -363,6 +369,11 @@ public class Connection_1_0 implements C
         return _port;
     }
 
+    public ServerProtocolEngine getProtocolEngine()
+    {
+        return _protocolEngine;
+    }
+
     @Override
     public Transport getTransport()
     {
@@ -480,4 +491,11 @@ public class Connection_1_0 implements C
     }
 
 
+    public void transportStateChanged()
+    {
+        for (Session_1_0 session : _sessions)
+        {
+            session.transportStateChanged();
+        }
+    }
 }

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1644704&r1=1644703&r2=1644704&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Thu Dec 11 17:14:34 2014
@@ -40,6 +40,7 @@ import org.apache.qpid.amqp_1_0.type.mes
 import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
 import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
 import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+import org.apache.qpid.protocol.ServerProtocolEngine;
 import org.apache.qpid.server.consumer.AbstractConsumerTarget;
 import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.message.MessageInstance;
@@ -84,7 +85,7 @@ class ConsumerTarget_1_0 extends Abstrac
 
     public boolean isSuspended()
     {
-        return _link.getSession().getConnectionModel().isStopped() || getState() != State.ACTIVE;// || !getEndpoint().hasCreditToSend();
+        return _link.getSession().getConnectionModel().isStopped() || getState() != State.ACTIVE;
 
     }
 
@@ -290,7 +291,9 @@ class ConsumerTarget_1_0 extends Abstrac
     {
         synchronized (_link.getLock())
         {
-            final boolean hasCredit = _link.isAttached() && getEndpoint().hasCreditToSend();
+
+            ServerProtocolEngine protocolEngine = getSession().getConnection().getProtocolEngine();
+            final boolean hasCredit = _link.isAttached() && getEndpoint().hasCreditToSend() && !protocolEngine.isTransportBlockedForWriting();
             if(!hasCredit && getState() == State.ACTIVE)
             {
                 suspend();
@@ -330,7 +333,8 @@ class ConsumerTarget_1_0 extends Abstrac
     {
         synchronized(_link.getLock())
         {
-            if(isSuspended() && getEndpoint() != null)
+            ServerProtocolEngine protocolEngine = getSession().getConnection().getProtocolEngine();
+            if(isSuspended() && getEndpoint() != null && !protocolEngine.isTransportBlockedForWriting())
             {
                 updateState(State.SUSPENDED, State.ACTIVE);
                 _transactionId = _link.getTransactionId();

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java?rev=1644704&r1=1644703&r2=1644704&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java Thu Dec 11 17:14:34 2014
@@ -118,6 +118,7 @@ public class ProtocolEngine_1_0_0_SASL i
     private NetworkConnection _network;
     private Sender<ByteBuffer> _sender;
     private Connection_1_0 _connection;
+    private volatile boolean _transportBlockedForWriting;
 
 
     static enum State {
@@ -216,7 +217,7 @@ public class ProtocolEngine_1_0_0_SASL i
         _endpoint.setProperties(serverProperties);
 
         _endpoint.setRemoteAddress(getRemoteAddress());
-        _connection = new Connection_1_0(_broker, _endpoint, _connectionId, _port, _transport, subjectCreator);
+        _connection = new Connection_1_0(_broker, _endpoint, _connectionId, _port, _transport, subjectCreator, this);
 
         _endpoint.setConnectionEventListener(_connection);
         _endpoint.setFrameOutputHandler(this);
@@ -529,6 +530,8 @@ public class ProtocolEngine_1_0_0_SASL i
 
     }
 
+
+
     public void close()
     {
         _sender.close();
@@ -559,4 +562,18 @@ public class ProtocolEngine_1_0_0_SASL i
     {
         return _lastWriteTime;
     }
+
+    @Override
+    public boolean isTransportBlockedForWriting()
+    {
+        return _transportBlockedForWriting;
+    }
+    @Override
+    public void setTransportBlockedForWriting(final boolean blocked)
+    {
+        _transportBlockedForWriting = blocked;
+        _connection.transportStateChanged();
+
+    }
+
 }

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1644704&r1=1644703&r2=1644704&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Thu Dec 11 17:14:34 2014
@@ -728,4 +728,9 @@ public class SendingLink_1_0 implements
     {
         return _consumer;
     }
+
+    public ConsumerTarget_1_0 getConsumerTarget()
+    {
+        return _target;
+    }
 }

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1644704&r1=1644703&r2=1644704&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Thu Dec 11 17:14:34 2014
@@ -109,6 +109,7 @@ public class Session_1_0 implements Sess
     private final Subject _subject = new Subject();
 
     private final CopyOnWriteArrayList<Consumer<?>> _consumers = new CopyOnWriteArrayList<Consumer<?>>();
+    private final CopyOnWriteArrayList<SendingLink_1_0> _sendingLinks = new CopyOnWriteArrayList<>();
     private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener();
     private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>();
     private Session<?> _modelObject;
@@ -211,7 +212,7 @@ public class Session_1_0 implements Sess
                         );
 
                         sendingLinkEndpoint.setLinkEventListener(new SubjectSpecificSendingLinkListener(sendingLink));
-                        registerConsumer(sendingLink.getConsumer());
+                        registerConsumer(sendingLink);
 
                         link = sendingLink;
                         if(TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()))
@@ -411,12 +412,14 @@ public class Session_1_0 implements Sess
         }
     }
 
-    private void registerConsumer(final ConsumerImpl consumer)
+    private void registerConsumer(final SendingLink_1_0 link)
     {
+        ConsumerImpl consumer = link.getConsumer();
         if(consumer instanceof Consumer<?>)
         {
             Consumer<?> modelConsumer = (Consumer<?>) consumer;
             _consumers.add(modelConsumer);
+            _sendingLinks.add(link);
             modelConsumer.addChangeListener(_consumerClosedListener);
             consumerAdded(modelConsumer);
         }
@@ -609,6 +612,20 @@ public class Session_1_0 implements Sess
     }
 
     @Override
+    public void transportStateChanged()
+    {
+        for(SendingLink_1_0 link : _sendingLinks)
+        {
+            ConsumerTarget_1_0 target = link.getConsumerTarget();
+            target.flowStateChanged();
+
+
+        }
+
+
+    }
+
+    @Override
     public LogSubject getLogSubject()
     {
         return this;

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java?rev=1644704&r1=1644703&r2=1644704&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java Thu Dec 11 17:14:34 2014
@@ -30,4 +30,8 @@ public interface ServerProtocolEngine ex
     long getConnectionId();
 
     Subject getSubject();
+
+    boolean isTransportBlockedForWriting();
+
+    void setTransportBlockedForWriting(boolean blocked);
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org