You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2015/11/26 15:17:36 UTC

svn commit: r1716667 - in /qpid/java/branches/6.0.x: ./ broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java

Author: orudyy
Date: Thu Nov 26 14:17:35 2015
New Revision: 1716667

URL: http://svn.apache.org/viewvc?rev=1716667&view=rev
Log:
QPID-6900: [Java Broker] Address unsafe publication issue in AMQConnection_0_8
------------------------------------------------------------------------
Merged from trunk with command:
svn merge -c r1716127 https://svn.apache.org/repos/asf/qpid/java/trunk

Modified:
    qpid/java/branches/6.0.x/   (props changed)
    qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java

Propchange: qpid/java/branches/6.0.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Nov 26 14:17:35 2015
@@ -9,5 +9,5 @@
 /qpid/branches/java-broker-vhost-refactor/java:1493674-1494547
 /qpid/branches/java-network-refactor/qpid/java:805429-821809
 /qpid/branches/qpid-2935/qpid/java:1061302-1072333
-/qpid/java/trunk:1715445-1715447,1715586,1715940,1716086-1716087
+/qpid/java/trunk:1715445-1715447,1715586,1715940,1716086-1716087,1716127
 /qpid/trunk/qpid:796646-796653

Modified: qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java?rev=1716667&r1=1716666&r2=1716667&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java Thu Nov 26 14:17:35 2015
@@ -66,6 +66,7 @@ import org.apache.qpid.server.model.Prot
 import org.apache.qpid.server.protocol.ConnectionClosingTicker;
 import org.apache.qpid.server.security.*;
 import org.apache.qpid.server.transport.AbstractAMQPConnection;
+import org.apache.qpid.server.transport.MultiVersionProtocolEngine;
 import org.apache.qpid.server.transport.ProtocolEngine;
 import org.apache.qpid.server.configuration.BrokerProperties;
 import org.apache.qpid.server.consumer.ConsumerImpl;
@@ -127,26 +128,26 @@ public class AMQPConnection_0_8
      * Used so we know which channels we need to call {@link AMQChannel#receivedComplete()}
      * on after handling the frames.
      */
-    private final Set<AMQChannel> _channelsForCurrentMessage = new HashSet<>();
+    private final Set<AMQChannel> _channelsForCurrentMessage = Collections.newSetFromMap(new ConcurrentHashMap<AMQChannel, Boolean>());
 
     private final ServerDecoder _decoder;
 
-    private SaslServer _saslServer;
+    private volatile SaslServer _saslServer;
 
-    private long _maxNoOfChannels;
+    private volatile long _maxNoOfChannels;
 
-    private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion();
-    private final MethodRegistry _methodRegistry = new MethodRegistry(_protocolVersion);
+    private volatile ProtocolVersion _protocolVersion;
+    private volatile MethodRegistry _methodRegistry;
 
     private final Queue<Action<? super AMQPConnection_0_8>> _asyncTaskList =
             new ConcurrentLinkedQueue<>();
 
     private final Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<>();
-    private ProtocolOutputConverter _protocolOutputConverter;
+    private volatile ProtocolOutputConverter _protocolOutputConverter;
 
     private final Object _reference = new Object();
 
-    private int _maxFrameSize;
+    private volatile int _maxFrameSize;
     private final AtomicBoolean _orderlyClose = new AtomicBoolean(false);
 
     private final ServerNetworkConnection _network;
@@ -157,8 +158,8 @@ public class AMQPConnection_0_8
     private boolean _blocking;
 
     private volatile boolean _closeWhenNoRoute;
-    private boolean _compressionSupported;
-    private int _messageCompressionThreshold;
+    private volatile boolean _compressionSupported;
+    private volatile int _messageCompressionThreshold;
 
     /**
      * QPID-6744 - Older queue clients (<=0.32) set the nowait flag false on the queue.delete method and then
@@ -168,10 +169,10 @@ public class AMQPConnection_0_8
     private volatile boolean _sendQueueDeleteOkRegardless;
     private final Pattern _sendQueueDeleteOkRegardlessClientVerRegexp;
 
-    private int _currentClassId;
-    private int _currentMethodId;
-    private int _binaryDataLimit;
-    private long _maxMessageSize;
+    private volatile int _currentClassId;
+    private volatile int _currentMethodId;
+    private final int _binaryDataLimit;
+    private final long _maxMessageSize;
     private volatile boolean _transportBlockedForWriting;
 
     public AMQPConnection_0_8(Broker<?> broker,
@@ -694,8 +695,10 @@ public class AMQPConnection_0_8
 
     private void setProtocolVersion(ProtocolVersion pv)
     {
+        // TODO MultiVersionProtocolEngine takes responsibility for making the ProtocolVersion determination.
+        // These steps could be moved to construction.
         _protocolVersion = pv;
-        _methodRegistry.setProtocolVersion(_protocolVersion);
+        _methodRegistry = new MethodRegistry(_protocolVersion);
         _protocolOutputConverter = new ProtocolOutputConverterImpl(this);
     }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org