You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2015/11/24 14:01:28 UTC
svn commit: r1716127 -
/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
Author: kwall
Date: Tue Nov 24 13:01:27 2015
New Revision: 1716127
URL: http://svn.apache.org/viewvc?rev=1716127&view=rev
Log:
QPID-6900: [Java Broker] Address unsafe publication issue in AMQConnection_0_8
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java?rev=1716127&r1=1716126&r2=1716127&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java Tue Nov 24 13:01:27 2015
@@ -66,6 +66,7 @@ import org.apache.qpid.server.model.Prot
import org.apache.qpid.server.protocol.ConnectionClosingTicker;
import org.apache.qpid.server.security.*;
import org.apache.qpid.server.transport.AbstractAMQPConnection;
+import org.apache.qpid.server.transport.MultiVersionProtocolEngine;
import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.consumer.ConsumerImpl;
@@ -127,26 +128,26 @@ public class AMQPConnection_0_8
* Used so we know which channels we need to call {@link AMQChannel#receivedComplete()}
* on after handling the frames.
*/
- private final Set<AMQChannel> _channelsForCurrentMessage = new HashSet<>();
+ private final Set<AMQChannel> _channelsForCurrentMessage = Collections.newSetFromMap(new ConcurrentHashMap<AMQChannel, Boolean>());
private final ServerDecoder _decoder;
- private SaslServer _saslServer;
+ private volatile SaslServer _saslServer;
- private long _maxNoOfChannels;
+ private volatile long _maxNoOfChannels;
- private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion();
- private final MethodRegistry _methodRegistry = new MethodRegistry(_protocolVersion);
+ private volatile ProtocolVersion _protocolVersion;
+ private volatile MethodRegistry _methodRegistry;
private final Queue<Action<? super AMQPConnection_0_8>> _asyncTaskList =
new ConcurrentLinkedQueue<>();
private final Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<>();
- private ProtocolOutputConverter _protocolOutputConverter;
+ private volatile ProtocolOutputConverter _protocolOutputConverter;
private final Object _reference = new Object();
- private int _maxFrameSize;
+ private volatile int _maxFrameSize;
private final AtomicBoolean _orderlyClose = new AtomicBoolean(false);
private final ServerNetworkConnection _network;
@@ -157,8 +158,8 @@ public class AMQPConnection_0_8
private boolean _blocking;
private volatile boolean _closeWhenNoRoute;
- private boolean _compressionSupported;
- private int _messageCompressionThreshold;
+ private volatile boolean _compressionSupported;
+ private volatile int _messageCompressionThreshold;
/**
* QPID-6744 - Older queue clients (<=0.32) set the nowait flag false on the queue.delete method and then
@@ -168,10 +169,10 @@ public class AMQPConnection_0_8
private volatile boolean _sendQueueDeleteOkRegardless;
private final Pattern _sendQueueDeleteOkRegardlessClientVerRegexp;
- private int _currentClassId;
- private int _currentMethodId;
- private int _binaryDataLimit;
- private long _maxMessageSize;
+ private volatile int _currentClassId;
+ private volatile int _currentMethodId;
+ private final int _binaryDataLimit;
+ private final long _maxMessageSize;
private volatile boolean _transportBlockedForWriting;
public AMQPConnection_0_8(Broker<?> broker,
@@ -694,8 +695,10 @@ public class AMQPConnection_0_8
private void setProtocolVersion(ProtocolVersion pv)
{
+ // TODO MultiVersionProtocolEngine takes responsibility for making the ProtocolVersion determination.
+ // These steps could be moved to construction.
_protocolVersion = pv;
- _methodRegistry.setProtocolVersion(_protocolVersion);
+ _methodRegistry = new MethodRegistry(_protocolVersion);
_protocolOutputConverter = new ProtocolOutputConverterImpl(this);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org