You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/11/18 10:51:00 UTC

svn commit: r1714959 - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/transport/ broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/

Author: rgodfrey
Date: Wed Nov 18 09:51:00 2015
New Revision: 1714959

URL: http://svn.apache.org/viewvc?rev=1714959&view=rev
Log:
QPID-6865 : Avoid deadlock when calling setMessageAssignmentSuspended

Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
    qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java?rev=1714959&r1=1714958&r2=1714959&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java Wed Nov 18 09:51:00 2015
@@ -45,7 +45,6 @@ import org.slf4j.LoggerFactory;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.configuration.updater.TaskExecutor;
 import org.apache.qpid.server.connection.ConnectionPrincipal;
-import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.messages.ConnectionMessages;
@@ -402,19 +401,21 @@ public abstract class AbstractAMQPConnec
     }
 
     @Override
-    public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended)
+    public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended, final boolean notifyConsumers)
     {
         _messageAssignmentSuspended.set(messageAssignmentSuspended);
-
-        for(AMQSessionModel<?> session : getSessionModels())
+        if(notifyConsumers)
         {
-            if (messageAssignmentSuspended)
-            {
-                session.ensureConsumersNoticedStateChange();
-            }
-            else
+            for (AMQSessionModel<?> session : getSessionModels())
             {
-                session.notifyConsumerTargetCurrentStates();
+                if (messageAssignmentSuspended)
+                {
+                    session.ensureConsumersNoticedStateChange();
+                }
+                else
+                {
+                    session.notifyConsumerTargetCurrentStates();
+                }
             }
         }
     }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java?rev=1714959&r1=1714958&r2=1714959&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java Wed Nov 18 09:51:00 2015
@@ -92,9 +92,9 @@ public class MultiVersionProtocolEngine
     }
 
     @Override
-    public void setMessageAssignmentSuspended(final boolean value)
+    public void setMessageAssignmentSuspended(final boolean value, final boolean notifyConsumers)
     {
-        _delegate.setMessageAssignmentSuspended(value);
+        _delegate.setMessageAssignmentSuspended(value, notifyConsumers);
     }
 
     @Override
@@ -239,7 +239,7 @@ public class MultiVersionProtocolEngine
     {
 
         @Override
-        public void setMessageAssignmentSuspended(final boolean value)
+        public void setMessageAssignmentSuspended(final boolean value, final boolean notifyConsumers)
         {
 
         }
@@ -361,7 +361,7 @@ public class MultiVersionProtocolEngine
         private final AtomicBoolean _hasWork = new AtomicBoolean();
 
         @Override
-        public void setMessageAssignmentSuspended(final boolean value)
+        public void setMessageAssignmentSuspended(final boolean value, final boolean notifyConsumers)
         {
         }
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1714959&r1=1714958&r2=1714959&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java Wed Nov 18 09:51:00 2015
@@ -208,7 +208,7 @@ public class NonBlockingConnection imple
     {
         if (_usedOutboundMessageSpace.addAndGet(size) > _outboundMessageBufferLimit)
         {
-            _protocolEngine.setMessageAssignmentSuspended(true);
+            _protocolEngine.setMessageAssignmentSuspended(true, false);
         }
     }
 
@@ -242,7 +242,7 @@ public class NonBlockingConnection imple
                 }
 
                 _protocolEngine.setIOThread(Thread.currentThread());
-                _protocolEngine.setMessageAssignmentSuspended(true);
+                _protocolEngine.setMessageAssignmentSuspended(true, true);
 
                 boolean processPendingComplete = processPending();
 
@@ -260,7 +260,7 @@ public class NonBlockingConnection imple
 
                     if (_fullyWritten)
                     {
-                        _protocolEngine.setMessageAssignmentSuspended(false);
+                        _protocolEngine.setMessageAssignmentSuspended(false, true);
                     }
                 }
                 else

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java?rev=1714959&r1=1714958&r2=1714959&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java Wed Nov 18 09:51:00 2015
@@ -54,7 +54,7 @@ public interface ProtocolEngine extends
 
     void setTransportBlockedForWriting(boolean blocked);
 
-    void setMessageAssignmentSuspended(boolean value);
+    void setMessageAssignmentSuspended(boolean value, final boolean notifyConsumers);
 
     boolean isMessageAssignmentSuspended();
 

Modified: qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java?rev=1714959&r1=1714958&r2=1714959&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java (original)
+++ qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java Wed Nov 18 09:51:00 2015
@@ -246,7 +246,7 @@ class WebSocketProvider implements Accep
                 try
                 {
                     _protocolEngine.setIOThread(Thread.currentThread());
-                    _protocolEngine.setMessageAssignmentSuspended(true);
+                    _protocolEngine.setMessageAssignmentSuspended(true, true);
                     Iterator<Runnable> iter = _protocolEngine.processPendingIterator();
                     while(iter.hasNext())
                     {
@@ -261,7 +261,7 @@ class WebSocketProvider implements Accep
 
                     _connectionWrapper.doWrite();
 
-                    _protocolEngine.setMessageAssignmentSuspended(false);
+                    _protocolEngine.setMessageAssignmentSuspended(false, true);
                 }
                 finally
                 {
@@ -418,7 +418,7 @@ class WebSocketProvider implements Accep
         {
             if (_usedOutboundMessageSpace.addAndGet(size) > _outboundMessageBufferLimit)
             {
-                _protocolEngine.setMessageAssignmentSuspended(true);
+                _protocolEngine.setMessageAssignmentSuspended(true, false);
             }
         }
 
@@ -469,7 +469,7 @@ class WebSocketProvider implements Accep
             try
             {
                 _protocolEngine.setIOThread(Thread.currentThread());
-                _protocolEngine.setMessageAssignmentSuspended(true);
+                _protocolEngine.setMessageAssignmentSuspended(true, true);
 
                 Iterator<Runnable> iter = _protocolEngine.processPendingIterator();
                 while(iter.hasNext())
@@ -479,7 +479,7 @@ class WebSocketProvider implements Accep
 
                 doWrite();
 
-                _protocolEngine.setMessageAssignmentSuspended(false);
+                _protocolEngine.setMessageAssignmentSuspended(false, true);
             }
             finally
             {



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org