You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/06/09 21:33:46 UTC
[1/2] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-6275
Repository: activemq
Updated Branches:
refs/heads/activemq-5.13.x 539d6b747 -> f471b51c2
https://issues.apache.org/jira/browse/AMQ-6275
Avoid unnecessary connection state lookup leading to lock ordering
issues.
(cherry picked from commit 98d20008333af380ae05d91036d123501115ab8c)
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/d594248d
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/d594248d
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/d594248d
Branch: refs/heads/activemq-5.13.x
Commit: d594248db59995432cdbf8b1866590e1cd8d814e
Parents: 539d6b7
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Jun 9 12:09:42 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Jun 9 12:10:47 2016 -0400
----------------------------------------------------------------------
.../org/apache/activemq/broker/TransportConnection.java | 9 ++++-----
1 file changed, 4 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/d594248d/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
index 2727503..350f529 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -671,7 +671,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
broker.addConsumer(cs.getContext(), info);
try {
ss.addConsumer(info);
- addConsumerBrokerExchange(info.getConsumerId());
+ addConsumerBrokerExchange(cs, info.getConsumerId());
} catch (IllegalStateException e) {
broker.removeConsumer(cs.getContext(), info);
}
@@ -1513,15 +1513,14 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
return result;
}
- private ConsumerBrokerExchange addConsumerBrokerExchange(ConsumerId id) {
+ private ConsumerBrokerExchange addConsumerBrokerExchange(TransportConnectionState connectionState, ConsumerId id) {
ConsumerBrokerExchange result = consumerExchanges.get(id);
if (result == null) {
synchronized (consumerExchanges) {
result = new ConsumerBrokerExchange();
- TransportConnectionState state = lookupConnectionState(id);
- context = state.getContext();
+ context = connectionState.getContext();
result.setConnectionContext(context);
- SessionState ss = state.getSessionState(id.getParentId());
+ SessionState ss = connectionState.getSessionState(id.getParentId());
if (ss != null) {
ConsumerState cs = ss.getConsumerState(id);
if (cs != null) {
[2/2] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-6319
Posted by ta...@apache.org.
https://issues.apache.org/jira/browse/AMQ-6319
Improve configuration for disabling non-SASL connections.
(cherry picked from commit c49db029ab280dcc1755d855f3641526ad2a5e90)
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/f471b51c
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/f471b51c
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/f471b51c
Branch: refs/heads/activemq-5.13.x
Commit: f471b51c2a3554b84f3d2d22f3730e243ca71ee5
Parents: d594248
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Jun 9 17:32:41 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Jun 9 17:33:27 2016 -0400
----------------------------------------------------------------------
.../activemq/transport/amqp/AmqpWireFormat.java | 8 ++-
.../transport/amqp/AmqpWireFormatFactory.java | 10 ++++
.../amqp/protocol/UnsupportedClientTest.java | 55 ++++++++++++++++++++
3 files changed, 71 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/f471b51c/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
index b5c8f59..149eb70 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
@@ -40,6 +40,7 @@ public class AmqpWireFormat implements WireFormat {
public static final int DEFAULT_CONNECTION_TIMEOUT = 30000;
public static final int DEFAULT_IDLE_TIMEOUT = 30000;
public static final int DEFAULT_PRODUCER_CREDIT = 1000;
+ public static final boolean DEFAULT_ALLOW_NON_SASL_CONNECTIONS = true;
private static final int SASL_PROTOCOL = 3;
@@ -50,6 +51,7 @@ public class AmqpWireFormat implements WireFormat {
private int idelTimeout = DEFAULT_IDLE_TIMEOUT;
private int producerCredit = DEFAULT_PRODUCER_CREDIT;
private String transformer = InboundTransformer.TRANSFORMER_JMS;
+ private boolean allowNonSaslConnections = DEFAULT_ALLOW_NON_SASL_CONNECTIONS;
private boolean magicRead = false;
private ResetListener resetListener;
@@ -58,8 +60,6 @@ public class AmqpWireFormat implements WireFormat {
void onProtocolReset();
}
- private boolean allowNonSaslConnections = true;
-
@Override
public ByteSequence marshal(Object command) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -131,6 +131,10 @@ public class AmqpWireFormat implements WireFormat {
return false;
}
+ if (!(header.getProtocolId() == 0 || header.getProtocolId() == 3)) {
+ return false;
+ }
+
if (!isAllowNonSaslConnections() && header.getProtocolId() != SASL_PROTOCOL) {
return false;
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/f471b51c/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java
index fb7aea4..bb428b4 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java
@@ -30,6 +30,7 @@ public class AmqpWireFormatFactory implements WireFormatFactory {
private int idelTimeout = AmqpWireFormat.DEFAULT_IDLE_TIMEOUT;
private int producerCredit = AmqpWireFormat.DEFAULT_PRODUCER_CREDIT;
private String transformer = InboundTransformer.TRANSFORMER_NATIVE;
+ private boolean allowNonSaslConnections = AmqpWireFormat.DEFAULT_ALLOW_NON_SASL_CONNECTIONS;
@Override
public WireFormat createWireFormat() {
@@ -40,6 +41,7 @@ public class AmqpWireFormatFactory implements WireFormatFactory {
wireFormat.setIdleTimeout(getIdelTimeout());
wireFormat.setProducerCredit(getProducerCredit());
wireFormat.setTransformer(getTransformer());
+ wireFormat.setAllowNonSaslConnections(isAllowNonSaslConnections());
return wireFormat;
}
@@ -83,4 +85,12 @@ public class AmqpWireFormatFactory implements WireFormatFactory {
public void setTransformer(String transformer) {
this.transformer = transformer;
}
+
+ public boolean isAllowNonSaslConnections() {
+ return allowNonSaslConnections;
+ }
+
+ public void setAllowNonSaslConnections(boolean allowNonSaslConnections) {
+ this.allowNonSaslConnections = allowNonSaslConnections;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/f471b51c/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/protocol/UnsupportedClientTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/protocol/UnsupportedClientTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/protocol/UnsupportedClientTest.java
index d71aee2..ae8f1a3 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/protocol/UnsupportedClientTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/protocol/UnsupportedClientTest.java
@@ -60,11 +60,17 @@ public class UnsupportedClientTest extends AmqpTestSupport {
super.setUp();
}
+ @Override
+ public String getAdditionalConfig() {
+ return "&wireFormat.allowNonSaslConnections=false";
+ }
+
@Test(timeout = 60000)
public void testOlderProtocolIsRejected() throws Exception {
AmqpHeader header = new AmqpHeader();
+ header.setProtocolId(3);
header.setMajor(0);
header.setMinor(9);
header.setRevision(1);
@@ -87,6 +93,7 @@ public class UnsupportedClientTest extends AmqpTestSupport {
AmqpHeader header = new AmqpHeader();
+ header.setProtocolId(3);
header.setMajor(2);
header.setMinor(0);
header.setRevision(0);
@@ -109,6 +116,7 @@ public class UnsupportedClientTest extends AmqpTestSupport {
AmqpHeader header = new AmqpHeader();
+ header.setProtocolId(3);
header.setMajor(1);
header.setMinor(1);
header.setRevision(0);
@@ -131,6 +139,7 @@ public class UnsupportedClientTest extends AmqpTestSupport {
AmqpHeader header = new AmqpHeader();
+ header.setProtocolId(3);
header.setMajor(1);
header.setMinor(0);
header.setRevision(1);
@@ -149,6 +158,52 @@ public class UnsupportedClientTest extends AmqpTestSupport {
}
@Test(timeout = 60000)
+ public void testNonSaslClientIsRejected() throws Exception {
+
+ AmqpHeader header = new AmqpHeader();
+
+ header.setProtocolId(0);
+ header.setMajor(1);
+ header.setMinor(0);
+ header.setRevision(0);
+
+ // Test TCP
+ doTestInvalidHeaderProcessing(amqpPort, header, false);
+
+ // Test SSL
+ doTestInvalidHeaderProcessing(amqpSslPort, header, true);
+
+ // Test NIO
+ doTestInvalidHeaderProcessing(amqpNioPort, header, false);
+
+ // Test NIO+SSL
+ doTestInvalidHeaderProcessing(amqpNioPlusSslPort, header, true);
+ }
+
+ @Test(timeout = 60000)
+ public void testUnkownProtocolIdIsRejected() throws Exception {
+
+ AmqpHeader header = new AmqpHeader();
+
+ header.setProtocolId(5);
+ header.setMajor(1);
+ header.setMinor(0);
+ header.setRevision(0);
+
+ // Test TCP
+ doTestInvalidHeaderProcessing(amqpPort, header, false);
+
+ // Test SSL
+ doTestInvalidHeaderProcessing(amqpSslPort, header, true);
+
+ // Test NIO
+ doTestInvalidHeaderProcessing(amqpNioPort, header, false);
+
+ // Test NIO+SSL
+ doTestInvalidHeaderProcessing(amqpNioPlusSslPort, header, true);
+ }
+
+ @Test(timeout = 60000)
public void testInvalidProtocolHeader() throws Exception {
AmqpHeader header = new AmqpHeader(new Buffer(new byte[]{'S', 'T', 'O', 'M', 'P', 0, 0, 0}), false);