You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/06/09 21:33:46 UTC

[1/2] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6275

Repository: activemq
Updated Branches:
  refs/heads/activemq-5.13.x 539d6b747 -> f471b51c2


https://issues.apache.org/jira/browse/AMQ-6275

Avoid unnecessary connection state lookup leading to lock ordering
issues.
(cherry picked from commit 98d20008333af380ae05d91036d123501115ab8c)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/d594248d
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/d594248d
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/d594248d

Branch: refs/heads/activemq-5.13.x
Commit: d594248db59995432cdbf8b1866590e1cd8d814e
Parents: 539d6b7
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Jun 9 12:09:42 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Jun 9 12:10:47 2016 -0400

----------------------------------------------------------------------
 .../org/apache/activemq/broker/TransportConnection.java     | 9 ++++-----
 1 file changed, 4 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/d594248d/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
index 2727503..350f529 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -671,7 +671,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
             broker.addConsumer(cs.getContext(), info);
             try {
                 ss.addConsumer(info);
-                addConsumerBrokerExchange(info.getConsumerId());
+                addConsumerBrokerExchange(cs, info.getConsumerId());
             } catch (IllegalStateException e) {
                 broker.removeConsumer(cs.getContext(), info);
             }
@@ -1513,15 +1513,14 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
         return result;
     }
 
-    private ConsumerBrokerExchange addConsumerBrokerExchange(ConsumerId id) {
+    private ConsumerBrokerExchange addConsumerBrokerExchange(TransportConnectionState connectionState, ConsumerId id) {
         ConsumerBrokerExchange result = consumerExchanges.get(id);
         if (result == null) {
             synchronized (consumerExchanges) {
                 result = new ConsumerBrokerExchange();
-                TransportConnectionState state = lookupConnectionState(id);
-                context = state.getContext();
+                context = connectionState.getContext();
                 result.setConnectionContext(context);
-                SessionState ss = state.getSessionState(id.getParentId());
+                SessionState ss = connectionState.getSessionState(id.getParentId());
                 if (ss != null) {
                     ConsumerState cs = ss.getConsumerState(id);
                     if (cs != null) {


[2/2] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6319

Posted by ta...@apache.org.
https://issues.apache.org/jira/browse/AMQ-6319

Improve configuration for disabling non-SASL connections.
(cherry picked from commit c49db029ab280dcc1755d855f3641526ad2a5e90)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/f471b51c
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/f471b51c
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/f471b51c

Branch: refs/heads/activemq-5.13.x
Commit: f471b51c2a3554b84f3d2d22f3730e243ca71ee5
Parents: d594248
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Jun 9 17:32:41 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Jun 9 17:33:27 2016 -0400

----------------------------------------------------------------------
 .../activemq/transport/amqp/AmqpWireFormat.java |  8 ++-
 .../transport/amqp/AmqpWireFormatFactory.java   | 10 ++++
 .../amqp/protocol/UnsupportedClientTest.java    | 55 ++++++++++++++++++++
 3 files changed, 71 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/f471b51c/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
index b5c8f59..149eb70 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
@@ -40,6 +40,7 @@ public class AmqpWireFormat implements WireFormat {
     public static final int DEFAULT_CONNECTION_TIMEOUT = 30000;
     public static final int DEFAULT_IDLE_TIMEOUT = 30000;
     public static final int DEFAULT_PRODUCER_CREDIT = 1000;
+    public static final boolean DEFAULT_ALLOW_NON_SASL_CONNECTIONS = true;
 
     private static final int SASL_PROTOCOL = 3;
 
@@ -50,6 +51,7 @@ public class AmqpWireFormat implements WireFormat {
     private int idelTimeout = DEFAULT_IDLE_TIMEOUT;
     private int producerCredit = DEFAULT_PRODUCER_CREDIT;
     private String transformer = InboundTransformer.TRANSFORMER_JMS;
+    private boolean allowNonSaslConnections = DEFAULT_ALLOW_NON_SASL_CONNECTIONS;
 
     private boolean magicRead = false;
     private ResetListener resetListener;
@@ -58,8 +60,6 @@ public class AmqpWireFormat implements WireFormat {
         void onProtocolReset();
     }
 
-    private boolean allowNonSaslConnections = true;
-
     @Override
     public ByteSequence marshal(Object command) throws IOException {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -131,6 +131,10 @@ public class AmqpWireFormat implements WireFormat {
             return false;
         }
 
+        if (!(header.getProtocolId() == 0 || header.getProtocolId() == 3)) {
+            return false;
+        }
+
         if (!isAllowNonSaslConnections() && header.getProtocolId() != SASL_PROTOCOL) {
             return false;
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/f471b51c/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java
index fb7aea4..bb428b4 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java
@@ -30,6 +30,7 @@ public class AmqpWireFormatFactory implements WireFormatFactory {
     private int idelTimeout = AmqpWireFormat.DEFAULT_IDLE_TIMEOUT;
     private int producerCredit = AmqpWireFormat.DEFAULT_PRODUCER_CREDIT;
     private String transformer = InboundTransformer.TRANSFORMER_NATIVE;
+    private boolean allowNonSaslConnections = AmqpWireFormat.DEFAULT_ALLOW_NON_SASL_CONNECTIONS;
 
     @Override
     public WireFormat createWireFormat() {
@@ -40,6 +41,7 @@ public class AmqpWireFormatFactory implements WireFormatFactory {
         wireFormat.setIdleTimeout(getIdelTimeout());
         wireFormat.setProducerCredit(getProducerCredit());
         wireFormat.setTransformer(getTransformer());
+        wireFormat.setAllowNonSaslConnections(isAllowNonSaslConnections());
 
         return wireFormat;
     }
@@ -83,4 +85,12 @@ public class AmqpWireFormatFactory implements WireFormatFactory {
     public void setTransformer(String transformer) {
         this.transformer = transformer;
     }
+
+    public boolean isAllowNonSaslConnections() {
+        return allowNonSaslConnections;
+    }
+
+    public void setAllowNonSaslConnections(boolean allowNonSaslConnections) {
+        this.allowNonSaslConnections = allowNonSaslConnections;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/f471b51c/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/protocol/UnsupportedClientTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/protocol/UnsupportedClientTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/protocol/UnsupportedClientTest.java
index d71aee2..ae8f1a3 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/protocol/UnsupportedClientTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/protocol/UnsupportedClientTest.java
@@ -60,11 +60,17 @@ public class UnsupportedClientTest extends AmqpTestSupport {
         super.setUp();
     }
 
+    @Override
+    public String getAdditionalConfig() {
+        return "&wireFormat.allowNonSaslConnections=false";
+    }
+
     @Test(timeout = 60000)
     public void testOlderProtocolIsRejected() throws Exception {
 
         AmqpHeader header = new AmqpHeader();
 
+        header.setProtocolId(3);
         header.setMajor(0);
         header.setMinor(9);
         header.setRevision(1);
@@ -87,6 +93,7 @@ public class UnsupportedClientTest extends AmqpTestSupport {
 
         AmqpHeader header = new AmqpHeader();
 
+        header.setProtocolId(3);
         header.setMajor(2);
         header.setMinor(0);
         header.setRevision(0);
@@ -109,6 +116,7 @@ public class UnsupportedClientTest extends AmqpTestSupport {
 
         AmqpHeader header = new AmqpHeader();
 
+        header.setProtocolId(3);
         header.setMajor(1);
         header.setMinor(1);
         header.setRevision(0);
@@ -131,6 +139,7 @@ public class UnsupportedClientTest extends AmqpTestSupport {
 
         AmqpHeader header = new AmqpHeader();
 
+        header.setProtocolId(3);
         header.setMajor(1);
         header.setMinor(0);
         header.setRevision(1);
@@ -149,6 +158,52 @@ public class UnsupportedClientTest extends AmqpTestSupport {
     }
 
     @Test(timeout = 60000)
+    public void testNonSaslClientIsRejected() throws Exception {
+
+        AmqpHeader header = new AmqpHeader();
+
+        header.setProtocolId(0);
+        header.setMajor(1);
+        header.setMinor(0);
+        header.setRevision(0);
+
+        // Test TCP
+        doTestInvalidHeaderProcessing(amqpPort, header, false);
+
+        // Test SSL
+        doTestInvalidHeaderProcessing(amqpSslPort, header, true);
+
+        // Test NIO
+        doTestInvalidHeaderProcessing(amqpNioPort, header, false);
+
+        // Test NIO+SSL
+        doTestInvalidHeaderProcessing(amqpNioPlusSslPort, header, true);
+    }
+
+    @Test(timeout = 60000)
+    public void testUnkownProtocolIdIsRejected() throws Exception {
+
+        AmqpHeader header = new AmqpHeader();
+
+        header.setProtocolId(5);
+        header.setMajor(1);
+        header.setMinor(0);
+        header.setRevision(0);
+
+        // Test TCP
+        doTestInvalidHeaderProcessing(amqpPort, header, false);
+
+        // Test SSL
+        doTestInvalidHeaderProcessing(amqpSslPort, header, true);
+
+        // Test NIO
+        doTestInvalidHeaderProcessing(amqpNioPort, header, false);
+
+        // Test NIO+SSL
+        doTestInvalidHeaderProcessing(amqpNioPlusSslPort, header, true);
+    }
+
+    @Test(timeout = 60000)
     public void testInvalidProtocolHeader() throws Exception {
 
         AmqpHeader header = new AmqpHeader(new Buffer(new byte[]{'S', 'T', 'O', 'M', 'P', 0, 0, 0}), false);