You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/11/18 10:51:00 UTC
svn commit: r1714959 - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/transport/
broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/
Author: rgodfrey
Date: Wed Nov 18 09:51:00 2015
New Revision: 1714959
URL: http://svn.apache.org/viewvc?rev=1714959&view=rev
Log:
QPID-6865 : Avoid deadlock when calling setMessageAssignmentSuspended
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java?rev=1714959&r1=1714958&r2=1714959&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java Wed Nov 18 09:51:00 2015
@@ -45,7 +45,6 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.connection.ConnectionPrincipal;
-import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
@@ -402,19 +401,21 @@ public abstract class AbstractAMQPConnec
}
@Override
- public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended)
+ public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended, final boolean notifyConsumers)
{
_messageAssignmentSuspended.set(messageAssignmentSuspended);
-
- for(AMQSessionModel<?> session : getSessionModels())
+ if(notifyConsumers)
{
- if (messageAssignmentSuspended)
- {
- session.ensureConsumersNoticedStateChange();
- }
- else
+ for (AMQSessionModel<?> session : getSessionModels())
{
- session.notifyConsumerTargetCurrentStates();
+ if (messageAssignmentSuspended)
+ {
+ session.ensureConsumersNoticedStateChange();
+ }
+ else
+ {
+ session.notifyConsumerTargetCurrentStates();
+ }
}
}
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java?rev=1714959&r1=1714958&r2=1714959&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java Wed Nov 18 09:51:00 2015
@@ -92,9 +92,9 @@ public class MultiVersionProtocolEngine
}
@Override
- public void setMessageAssignmentSuspended(final boolean value)
+ public void setMessageAssignmentSuspended(final boolean value, final boolean notifyConsumers)
{
- _delegate.setMessageAssignmentSuspended(value);
+ _delegate.setMessageAssignmentSuspended(value, notifyConsumers);
}
@Override
@@ -239,7 +239,7 @@ public class MultiVersionProtocolEngine
{
@Override
- public void setMessageAssignmentSuspended(final boolean value)
+ public void setMessageAssignmentSuspended(final boolean value, final boolean notifyConsumers)
{
}
@@ -361,7 +361,7 @@ public class MultiVersionProtocolEngine
private final AtomicBoolean _hasWork = new AtomicBoolean();
@Override
- public void setMessageAssignmentSuspended(final boolean value)
+ public void setMessageAssignmentSuspended(final boolean value, final boolean notifyConsumers)
{
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1714959&r1=1714958&r2=1714959&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java Wed Nov 18 09:51:00 2015
@@ -208,7 +208,7 @@ public class NonBlockingConnection imple
{
if (_usedOutboundMessageSpace.addAndGet(size) > _outboundMessageBufferLimit)
{
- _protocolEngine.setMessageAssignmentSuspended(true);
+ _protocolEngine.setMessageAssignmentSuspended(true, false);
}
}
@@ -242,7 +242,7 @@ public class NonBlockingConnection imple
}
_protocolEngine.setIOThread(Thread.currentThread());
- _protocolEngine.setMessageAssignmentSuspended(true);
+ _protocolEngine.setMessageAssignmentSuspended(true, true);
boolean processPendingComplete = processPending();
@@ -260,7 +260,7 @@ public class NonBlockingConnection imple
if (_fullyWritten)
{
- _protocolEngine.setMessageAssignmentSuspended(false);
+ _protocolEngine.setMessageAssignmentSuspended(false, true);
}
}
else
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java?rev=1714959&r1=1714958&r2=1714959&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java Wed Nov 18 09:51:00 2015
@@ -54,7 +54,7 @@ public interface ProtocolEngine extends
void setTransportBlockedForWriting(boolean blocked);
- void setMessageAssignmentSuspended(boolean value);
+ void setMessageAssignmentSuspended(boolean value, final boolean notifyConsumers);
boolean isMessageAssignmentSuspended();
Modified: qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java?rev=1714959&r1=1714958&r2=1714959&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java (original)
+++ qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java Wed Nov 18 09:51:00 2015
@@ -246,7 +246,7 @@ class WebSocketProvider implements Accep
try
{
_protocolEngine.setIOThread(Thread.currentThread());
- _protocolEngine.setMessageAssignmentSuspended(true);
+ _protocolEngine.setMessageAssignmentSuspended(true, true);
Iterator<Runnable> iter = _protocolEngine.processPendingIterator();
while(iter.hasNext())
{
@@ -261,7 +261,7 @@ class WebSocketProvider implements Accep
_connectionWrapper.doWrite();
- _protocolEngine.setMessageAssignmentSuspended(false);
+ _protocolEngine.setMessageAssignmentSuspended(false, true);
}
finally
{
@@ -418,7 +418,7 @@ class WebSocketProvider implements Accep
{
if (_usedOutboundMessageSpace.addAndGet(size) > _outboundMessageBufferLimit)
{
- _protocolEngine.setMessageAssignmentSuspended(true);
+ _protocolEngine.setMessageAssignmentSuspended(true, false);
}
}
@@ -469,7 +469,7 @@ class WebSocketProvider implements Accep
try
{
_protocolEngine.setIOThread(Thread.currentThread());
- _protocolEngine.setMessageAssignmentSuspended(true);
+ _protocolEngine.setMessageAssignmentSuspended(true, true);
Iterator<Runnable> iter = _protocolEngine.processPendingIterator();
while(iter.hasNext())
@@ -479,7 +479,7 @@ class WebSocketProvider implements Accep
doWrite();
- _protocolEngine.setMessageAssignmentSuspended(false);
+ _protocolEngine.setMessageAssignmentSuspended(false, true);
}
finally
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org