You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by br...@apache.org on 2021/11/24 10:28:26 UTC
[activemq-artemis] 02/02: ARTEMIS-3569 Validate users on AMQP remote open
This is an automated email from the ASF dual-hosted git repository.
brusdev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit f8472fd736eff21d21ca2ab0c5a4dd22f0f1d00e
Author: Domenico Francesco Bruscino <br...@apache.org>
AuthorDate: Tue Nov 23 11:18:23 2021 +0100
ARTEMIS-3569 Validate users on AMQP remote open
---
.../protocol/amqp/broker/AMQPSessionCallback.java | 17 ++-----
.../amqp/proton/AMQPConnectionContext.java | 53 +++++++++++++++++++++-
.../protocol/amqp/proton/AMQPRedirectHandler.java | 2 +-
.../core/protocol/mqtt/MQTTRedirectHandler.java | 2 +-
.../management/impl/BrokerBalancerControlImpl.java | 14 +++---
.../core/impl/ActiveMQRedirectHandler.java | 2 +-
.../core/server/balancing/BrokerBalancer.java | 8 ++--
.../core/server/balancing/RedirectContext.java | 2 +-
.../core/server/balancing/RedirectHandler.java | 2 +-
.../server/balancing/targets/TargetResult.java | 17 +++++--
.../core/server/impl/ActiveMQServerImpl.java | 3 ++
.../core/server/balancing/BrokerBalancerTest.java | 2 +-
12 files changed, 87 insertions(+), 37 deletions(-)
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index afaa281..c78883a 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -58,7 +58,6 @@ import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionHandler;
-import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASLResult;
import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
@@ -194,30 +193,20 @@ public class AMQPSessionCallback implements SessionCallback {
String name = UUIDGenerator.getInstance().generateStringUUID();
- String user = null;
- String passcode = null;
- if (saslResult != null) {
- user = saslResult.getUser();
- if (saslResult instanceof PlainSASLResult) {
- passcode = ((PlainSASLResult) saslResult).getPassword();
- }
- }
-
if (connection.isBridgeConnection()) {
serverSession = manager.getServer().createInternalSession(name, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, protonSPI.getProtonConnectionDelegate(), // RemotingConnection remotingConnection,
false, // boolean autoCommitSends
false, // boolean autoCommitAcks,
false, // boolean preAcknowledge,
true, //boolean xa,
- (String) null, this, true, operationContext, manager.getPrefixes(), manager.getSecurityDomain());
+ null, this, true, operationContext, manager.getPrefixes(), manager.getSecurityDomain());
} else {
- final String validatedUser = manager.getServer().validateUser(user, passcode, protonSPI.getProtonConnectionDelegate(), manager.getSecurityDomain());
- serverSession = manager.getServer().createSession(name, user, passcode, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, protonSPI.getProtonConnectionDelegate(), // RemotingConnection remotingConnection,
+ serverSession = manager.getServer().createSession(name, connection.getUser(), connection.getPassword(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, protonSPI.getProtonConnectionDelegate(), // RemotingConnection remotingConnection,
false, // boolean autoCommitSends
false, // boolean autoCommitAcks,
false, // boolean preAcknowledge,
true, //boolean xa,
- (String) null, this, true, operationContext, manager.getPrefixes(), manager.getSecurityDomain(), validatedUser);
+ null, this, true, operationContext, manager.getPrefixes(), manager.getSecurityDomain(), connection.getValidatedUser());
}
}
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
index aca83de..f11e6cb 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.protocol.amqp.proton;
import java.net.URI;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@@ -61,6 +62,7 @@ import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.amqp.transaction.Coordinator;
+import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
@@ -109,6 +111,10 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
private final ScheduleOperator scheduleOp = new ScheduleOperator(new ScheduleRunnable());
private final AtomicReference<Future<?>> scheduledFutureRef = new AtomicReference(VOID_FUTURE);
+ private String user;
+ private String password;
+ private String validatedUser;
+
public AMQPConnectionContext(ProtonProtocolManager protocolManager,
AMQPConnectionCallback connectionSP,
String containerId,
@@ -238,6 +244,18 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
return handler;
}
+ public String getUser() {
+ return user;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public String getValidatedUser() {
+ return validatedUser;
+ }
+
public void destroy() {
handler.runLater(() -> connectionCallback.close());
}
@@ -530,8 +548,8 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
log.error("Error init connection", e);
}
- if ((connectionCallback.getTransportConnection().getRedirectTo() != null && protocolManager.getRedirectHandler()
- .redirect(this, connection)) || !validateConnection(connection)) {
+ if (!validateUser(connection) || (connectionCallback.getTransportConnection().getRedirectTo() != null
+ && protocolManager.getRedirectHandler().redirect(this, connection)) || !validateConnection(connection)) {
connection.close();
} else {
connection.setContext(AMQPConnectionContext.this);
@@ -557,6 +575,37 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
}
}
+ private boolean validateUser(Connection connection) throws Exception {
+ user = null;
+ password = null;
+ validatedUser = null;
+
+ SASLResult saslResult = getSASLResult();
+ if (saslResult != null) {
+ user = saslResult.getUser();
+ if (saslResult instanceof PlainSASLResult) {
+ password = ((PlainSASLResult) saslResult).getPassword();
+ }
+ }
+
+ if (isIncomingConnection() && saslClientFactory == null && !isBridgeConnection()) {
+ try {
+ validatedUser = protocolManager.getServer().validateUser(user, password, connectionCallback.getProtonConnectionDelegate(), protocolManager.getSecurityDomain());
+ } catch (ActiveMQSecurityException e) {
+ log.warn(e.getMessage(), e);
+ ErrorCondition error = new ErrorCondition();
+ error.setCondition(AmqpError.UNAUTHORIZED_ACCESS);
+ error.setDescription(e.getMessage() == null ? e.getClass().getSimpleName() : e.getMessage());
+ connection.setCondition(error);
+ connection.setProperties(Collections.singletonMap(AmqpSupport.CONNECTION_OPEN_FAILED, true));
+
+ return false;
+ }
+ }
+
+ return true;
+ }
+
class ScheduleOperator implements UnaryOperator<Future<?>> {
private long delay;
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPRedirectHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPRedirectHandler.java
index 31752a8..92abb55 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPRedirectHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPRedirectHandler.java
@@ -44,7 +44,7 @@ public class AMQPRedirectHandler extends RedirectHandler<AMQPRedirectContext> {
protected void cannotRedirect(AMQPRedirectContext context) {
ErrorCondition error = new ErrorCondition();
error.setCondition(ConnectionError.CONNECTION_FORCED);
- switch (context.getResult().status) {
+ switch (context.getResult().getStatus()) {
case REFUSED_USE_ANOTHER:
error.setDescription(String.format("Broker balancer %s, rejected this connection", context.getConnection().getTransportConnection().getRedirectTo()));
break;
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRedirectHandler.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRedirectHandler.java
index 15dcc4a..6565f2a 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRedirectHandler.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRedirectHandler.java
@@ -37,7 +37,7 @@ public class MQTTRedirectHandler extends RedirectHandler<MQTTRedirectContext> {
@Override
protected void cannotRedirect(MQTTRedirectContext context) {
- switch (context.getResult().status) {
+ switch (context.getResult().getStatus()) {
case REFUSED_USE_ANOTHER:
context.getMQTTSession().getProtocolHandler().sendConnack(MqttConnectReturnCode.CONNECTION_REFUSED_USE_ANOTHER_SERVER);
break;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/BrokerBalancerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/BrokerBalancerControlImpl.java
index 3747cb7..eb637ec 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/BrokerBalancerControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/BrokerBalancerControlImpl.java
@@ -61,9 +61,9 @@ public class BrokerBalancerControlImpl extends AbstractControl implements Broker
@Override
public CompositeData getTarget(String key) throws Exception {
TargetResult result = balancer.getTarget(key);
- if (TargetResult.Status.OK == result.status) {
+ if (TargetResult.Status.OK == result.getStatus()) {
CompositeData connectorData = null;
- TransportConfiguration connector = result.target.getConnector();
+ TransportConfiguration connector = result.getTarget().getConnector();
if (connector != null) {
TabularData paramsData = new TabularDataSupport(getParametersType());
@@ -79,7 +79,7 @@ public class BrokerBalancerControlImpl extends AbstractControl implements Broker
return new CompositeDataSupport(getTargetCompositeType(),
new String[]{"nodeID", "local", "connector"},
- new Object[]{result.target.getNodeID(), result.target.isLocal(), connectorData});
+ new Object[]{result.getTarget().getNodeID(), result.getTarget().isLocal(), connectorData});
}
return null;
@@ -88,12 +88,12 @@ public class BrokerBalancerControlImpl extends AbstractControl implements Broker
@Override
public String getTargetAsJSON(String key) {
TargetResult result = balancer.getTarget(key);
- if (TargetResult.Status.OK == result.status) {
- TransportConfiguration connector = result.target.getConnector();
+ if (TargetResult.Status.OK == result.getStatus()) {
+ TransportConfiguration connector = result.getTarget().getConnector();
JsonObjectBuilder targetDataBuilder = JsonLoader.createObjectBuilder()
- .add("nodeID", result.target.getNodeID())
- .add("local", result.target.isLocal());
+ .add("nodeID", result.getTarget().getNodeID())
+ .add("local", result.getTarget().isLocal());
if (connector == null) {
targetDataBuilder.addNull("connector");
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQRedirectHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQRedirectHandler.java
index 9be389c..bf775ab 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQRedirectHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQRedirectHandler.java
@@ -40,7 +40,7 @@ public class ActiveMQRedirectHandler extends RedirectHandler<ActiveMQRedirectCon
@Override
public void cannotRedirect(ActiveMQRedirectContext context) throws Exception {
- switch (context.getResult().status) {
+ switch (context.getResult().getStatus()) {
case REFUSED_UNAVAILABLE:
throw ActiveMQMessageBundle.BUNDLE.cannotRedirect();
case REFUSED_USE_ANOTHER:
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java
index 2aa69db..c256209 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java
@@ -68,7 +68,7 @@ public class BrokerBalancer implements ActiveMQComponent {
}
public Target getLocalTarget() {
- return localTarget.target;
+ return localTarget.getTarget();
}
public String getLocalTargetFilter() {
@@ -168,16 +168,16 @@ public class BrokerBalancer implements ActiveMQComponent {
}
if (result != null) {
- if (pool.isTargetReady(result.target)) {
+ if (pool.isTargetReady(result.getTarget())) {
if (logger.isDebugEnabled()) {
- logger.debug("The cache returns [" + result.target + "] ready for " + targetKey + "[" + key + "]");
+ logger.debug("The cache returns [" + result.getTarget() + "] ready for " + targetKey + "[" + key + "]");
}
return result;
}
if (logger.isDebugEnabled()) {
- logger.debug("The cache returns [" + result.target + "] not ready for " + targetKey + "[" + key + "]");
+ logger.debug("The cache returns [" + result.getTarget() + "] not ready for " + targetKey + "[" + key + "]");
}
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/RedirectContext.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/RedirectContext.java
index 5534a56..697fcbc 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/RedirectContext.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/RedirectContext.java
@@ -43,7 +43,7 @@ public class RedirectContext {
}
public Target getTarget() {
- return result.target;
+ return result.getTarget();
}
public TargetResult getResult() {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/RedirectHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/RedirectHandler.java
index 967e686..0e1ee4c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/RedirectHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/RedirectHandler.java
@@ -54,7 +54,7 @@ public abstract class RedirectHandler<T extends RedirectContext> {
context.setResult(brokerBalancer.getTarget(transportConnection, context.getClientID(), context.getUsername()));
- if (TargetResult.Status.OK != context.getResult().status) {
+ if (TargetResult.Status.OK != context.getResult().getStatus()) {
ActiveMQServerLogger.LOGGER.cannotRedirectClientConnection(transportConnection);
cannotRedirect(context);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/TargetResult.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/TargetResult.java
index 1be2ff9..cd161c4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/TargetResult.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/TargetResult.java
@@ -22,16 +22,25 @@ public class TargetResult {
public static final TargetResult REFUSED_UNAVAILABLE_RESULT = new TargetResult(Status.REFUSED_UNAVAILABLE);
public static final TargetResult REFUSED_USE_ANOTHER_RESULT = new TargetResult(Status.REFUSED_USE_ANOTHER);
- public Status status;
- public Target target;
+ private final Status status;
+ private final Target target;
- public TargetResult(Target t) {
- this.target = t;
+ public Status getStatus() {
+ return status;
+ }
+
+ public Target getTarget() {
+ return target;
+ }
+
+ public TargetResult(Target target) {
this.status = Status.OK;
+ this.target = target;
}
private TargetResult(Status s) {
this.status = s;
+ this.target = null;
}
public enum Status {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index c3f694f..be7a540 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -1676,6 +1676,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final Map<SimpleString, RoutingType> prefixes,
final String securityDomain,
String validatedUser) throws Exception {
+ if (validatedUser == null) {
+ validatedUser = validateUser(username, password, connection, securityDomain);
+ }
checkSessionLimit(validatedUser);
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerTest.java
index f41aca7..7f8ca0e 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerTest.java
@@ -74,7 +74,7 @@ public class BrokerBalancerTest {
@Test
public void getTarget() {
- assertEquals( localTarget, underTest.getTarget("FOO_EE").target);
+ assertEquals( localTarget, underTest.getTarget("FOO_EE").getTarget());
assertEquals(TargetResult.REFUSED_USE_ANOTHER_RESULT, underTest.getTarget("BAR_EE"));
}