You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by br...@apache.org on 2021/11/24 10:28:26 UTC

[activemq-artemis] 02/02: ARTEMIS-3569 Validate users on AMQP remote open

This is an automated email from the ASF dual-hosted git repository.

brusdev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit f8472fd736eff21d21ca2ab0c5a4dd22f0f1d00e
Author: Domenico Francesco Bruscino <br...@apache.org>
AuthorDate: Tue Nov 23 11:18:23 2021 +0100

    ARTEMIS-3569 Validate users on AMQP remote open
---
 .../protocol/amqp/broker/AMQPSessionCallback.java  | 17 ++-----
 .../amqp/proton/AMQPConnectionContext.java         | 53 +++++++++++++++++++++-
 .../protocol/amqp/proton/AMQPRedirectHandler.java  |  2 +-
 .../core/protocol/mqtt/MQTTRedirectHandler.java    |  2 +-
 .../management/impl/BrokerBalancerControlImpl.java | 14 +++---
 .../core/impl/ActiveMQRedirectHandler.java         |  2 +-
 .../core/server/balancing/BrokerBalancer.java      |  8 ++--
 .../core/server/balancing/RedirectContext.java     |  2 +-
 .../core/server/balancing/RedirectHandler.java     |  2 +-
 .../server/balancing/targets/TargetResult.java     | 17 +++++--
 .../core/server/impl/ActiveMQServerImpl.java       |  3 ++
 .../core/server/balancing/BrokerBalancerTest.java  |  2 +-
 12 files changed, 87 insertions(+), 37 deletions(-)

diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index afaa281..c78883a 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -58,7 +58,6 @@ import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
 import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
 import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
 import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionHandler;
-import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASLResult;
 import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
 import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
@@ -194,30 +193,20 @@ public class AMQPSessionCallback implements SessionCallback {
 
       String name = UUIDGenerator.getInstance().generateStringUUID();
 
-      String user = null;
-      String passcode = null;
-      if (saslResult != null) {
-         user = saslResult.getUser();
-         if (saslResult instanceof PlainSASLResult) {
-            passcode = ((PlainSASLResult) saslResult).getPassword();
-         }
-      }
-
       if (connection.isBridgeConnection())  {
          serverSession = manager.getServer().createInternalSession(name, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, protonSPI.getProtonConnectionDelegate(), // RemotingConnection remotingConnection,
                                                            false, // boolean autoCommitSends
                                                            false, // boolean autoCommitAcks,
                                                            false, // boolean preAcknowledge,
                                                            true, //boolean xa,
-                                                           (String) null, this, true, operationContext, manager.getPrefixes(), manager.getSecurityDomain());
+                                                           null, this, true, operationContext, manager.getPrefixes(), manager.getSecurityDomain());
       } else {
-         final String validatedUser = manager.getServer().validateUser(user, passcode, protonSPI.getProtonConnectionDelegate(), manager.getSecurityDomain());
-         serverSession = manager.getServer().createSession(name, user, passcode, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, protonSPI.getProtonConnectionDelegate(), // RemotingConnection remotingConnection,
+         serverSession = manager.getServer().createSession(name, connection.getUser(), connection.getPassword(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, protonSPI.getProtonConnectionDelegate(), // RemotingConnection remotingConnection,
                                                            false, // boolean autoCommitSends
                                                            false, // boolean autoCommitAcks,
                                                            false, // boolean preAcknowledge,
                                                            true, //boolean xa,
-                                                           (String) null, this, true, operationContext, manager.getPrefixes(), manager.getSecurityDomain(), validatedUser);
+                                                           null, this, true, operationContext, manager.getPrefixes(), manager.getSecurityDomain(), connection.getValidatedUser());
       }
    }
 
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
index aca83de..f11e6cb 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.protocol.amqp.proton;
 
 import java.net.URI;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
@@ -61,6 +62,7 @@ import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Source;
 import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
 import org.apache.qpid.proton.amqp.transaction.Coordinator;
+import org.apache.qpid.proton.amqp.transport.AmqpError;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
 import org.apache.qpid.proton.engine.Connection;
 import org.apache.qpid.proton.engine.Delivery;
@@ -109,6 +111,10 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
    private final ScheduleOperator scheduleOp = new ScheduleOperator(new ScheduleRunnable());
    private final AtomicReference<Future<?>> scheduledFutureRef = new AtomicReference(VOID_FUTURE);
 
+   private String user;
+   private String password;
+   private String validatedUser;
+
    public AMQPConnectionContext(ProtonProtocolManager protocolManager,
                                 AMQPConnectionCallback connectionSP,
                                 String containerId,
@@ -238,6 +244,18 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
       return handler;
    }
 
+   public String getUser() {
+      return user;
+   }
+
+   public String getPassword() {
+      return password;
+   }
+
+   public String getValidatedUser() {
+      return validatedUser;
+   }
+
    public void destroy() {
       handler.runLater(() -> connectionCallback.close());
    }
@@ -530,8 +548,8 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
          log.error("Error init connection", e);
       }
 
-      if ((connectionCallback.getTransportConnection().getRedirectTo() != null && protocolManager.getRedirectHandler()
-         .redirect(this, connection)) || !validateConnection(connection)) {
+      if (!validateUser(connection) || (connectionCallback.getTransportConnection().getRedirectTo() != null
+         && protocolManager.getRedirectHandler().redirect(this, connection)) || !validateConnection(connection)) {
          connection.close();
       } else {
          connection.setContext(AMQPConnectionContext.this);
@@ -557,6 +575,37 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
       }
    }
 
+   private boolean validateUser(Connection connection) throws Exception {
+      user = null;
+      password = null;
+      validatedUser = null;
+
+      SASLResult saslResult = getSASLResult();
+      if (saslResult != null) {
+         user = saslResult.getUser();
+         if (saslResult instanceof PlainSASLResult) {
+            password = ((PlainSASLResult) saslResult).getPassword();
+         }
+      }
+
+      if (isIncomingConnection() && saslClientFactory == null && !isBridgeConnection()) {
+         try {
+            validatedUser = protocolManager.getServer().validateUser(user, password, connectionCallback.getProtonConnectionDelegate(), protocolManager.getSecurityDomain());
+         } catch (ActiveMQSecurityException e) {
+            log.warn(e.getMessage(), e);
+            ErrorCondition error = new ErrorCondition();
+            error.setCondition(AmqpError.UNAUTHORIZED_ACCESS);
+            error.setDescription(e.getMessage() == null ? e.getClass().getSimpleName() : e.getMessage());
+            connection.setCondition(error);
+            connection.setProperties(Collections.singletonMap(AmqpSupport.CONNECTION_OPEN_FAILED, true));
+
+            return false;
+         }
+      }
+
+      return true;
+   }
+
    class ScheduleOperator implements UnaryOperator<Future<?>> {
 
       private long delay;
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPRedirectHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPRedirectHandler.java
index 31752a8..92abb55 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPRedirectHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPRedirectHandler.java
@@ -44,7 +44,7 @@ public class AMQPRedirectHandler extends RedirectHandler<AMQPRedirectContext> {
    protected void cannotRedirect(AMQPRedirectContext context) {
       ErrorCondition error = new ErrorCondition();
       error.setCondition(ConnectionError.CONNECTION_FORCED);
-      switch (context.getResult().status) {
+      switch (context.getResult().getStatus()) {
          case REFUSED_USE_ANOTHER:
             error.setDescription(String.format("Broker balancer %s, rejected this connection", context.getConnection().getTransportConnection().getRedirectTo()));
             break;
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRedirectHandler.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRedirectHandler.java
index 15dcc4a..6565f2a 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRedirectHandler.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRedirectHandler.java
@@ -37,7 +37,7 @@ public class MQTTRedirectHandler extends RedirectHandler<MQTTRedirectContext> {
 
    @Override
    protected void cannotRedirect(MQTTRedirectContext context) {
-      switch (context.getResult().status) {
+      switch (context.getResult().getStatus()) {
          case REFUSED_USE_ANOTHER:
             context.getMQTTSession().getProtocolHandler().sendConnack(MqttConnectReturnCode.CONNECTION_REFUSED_USE_ANOTHER_SERVER);
             break;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/BrokerBalancerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/BrokerBalancerControlImpl.java
index 3747cb7..eb637ec 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/BrokerBalancerControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/BrokerBalancerControlImpl.java
@@ -61,9 +61,9 @@ public class BrokerBalancerControlImpl extends AbstractControl implements Broker
    @Override
    public CompositeData getTarget(String key) throws Exception {
       TargetResult result = balancer.getTarget(key);
-      if (TargetResult.Status.OK == result.status) {
+      if (TargetResult.Status.OK == result.getStatus()) {
          CompositeData connectorData = null;
-         TransportConfiguration connector = result.target.getConnector();
+         TransportConfiguration connector = result.getTarget().getConnector();
 
          if (connector != null) {
             TabularData paramsData = new TabularDataSupport(getParametersType());
@@ -79,7 +79,7 @@ public class BrokerBalancerControlImpl extends AbstractControl implements Broker
 
          return new CompositeDataSupport(getTargetCompositeType(),
                                          new String[]{"nodeID", "local", "connector"},
-                                         new Object[]{result.target.getNodeID(), result.target.isLocal(), connectorData});
+                                         new Object[]{result.getTarget().getNodeID(), result.getTarget().isLocal(), connectorData});
       }
 
       return null;
@@ -88,12 +88,12 @@ public class BrokerBalancerControlImpl extends AbstractControl implements Broker
    @Override
    public String getTargetAsJSON(String key) {
       TargetResult result = balancer.getTarget(key);
-      if (TargetResult.Status.OK == result.status) {
-         TransportConfiguration connector = result.target.getConnector();
+      if (TargetResult.Status.OK == result.getStatus()) {
+         TransportConfiguration connector = result.getTarget().getConnector();
 
          JsonObjectBuilder targetDataBuilder = JsonLoader.createObjectBuilder()
-            .add("nodeID", result.target.getNodeID())
-            .add("local", result.target.isLocal());
+            .add("nodeID", result.getTarget().getNodeID())
+            .add("local", result.getTarget().isLocal());
 
          if (connector == null) {
             targetDataBuilder.addNull("connector");
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQRedirectHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQRedirectHandler.java
index 9be389c..bf775ab 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQRedirectHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQRedirectHandler.java
@@ -40,7 +40,7 @@ public class ActiveMQRedirectHandler extends RedirectHandler<ActiveMQRedirectCon
 
    @Override
    public void cannotRedirect(ActiveMQRedirectContext context) throws Exception {
-      switch (context.getResult().status) {
+      switch (context.getResult().getStatus()) {
          case REFUSED_UNAVAILABLE:
             throw ActiveMQMessageBundle.BUNDLE.cannotRedirect();
          case REFUSED_USE_ANOTHER:
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java
index 2aa69db..c256209 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java
@@ -68,7 +68,7 @@ public class BrokerBalancer implements ActiveMQComponent {
    }
 
    public Target getLocalTarget() {
-      return localTarget.target;
+      return localTarget.getTarget();
    }
 
    public String getLocalTargetFilter() {
@@ -168,16 +168,16 @@ public class BrokerBalancer implements ActiveMQComponent {
       }
 
       if (result != null) {
-         if (pool.isTargetReady(result.target)) {
+         if (pool.isTargetReady(result.getTarget())) {
             if (logger.isDebugEnabled()) {
-               logger.debug("The cache returns [" + result.target + "] ready for " + targetKey + "[" + key + "]");
+               logger.debug("The cache returns [" + result.getTarget() + "] ready for " + targetKey + "[" + key + "]");
             }
 
             return result;
          }
 
          if (logger.isDebugEnabled()) {
-            logger.debug("The cache returns [" + result.target + "] not ready for " + targetKey + "[" + key + "]");
+            logger.debug("The cache returns [" + result.getTarget() + "] not ready for " + targetKey + "[" + key + "]");
          }
       }
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/RedirectContext.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/RedirectContext.java
index 5534a56..697fcbc 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/RedirectContext.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/RedirectContext.java
@@ -43,7 +43,7 @@ public class RedirectContext {
    }
 
    public Target getTarget() {
-      return result.target;
+      return result.getTarget();
    }
 
    public TargetResult getResult() {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/RedirectHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/RedirectHandler.java
index 967e686..0e1ee4c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/RedirectHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/RedirectHandler.java
@@ -54,7 +54,7 @@ public abstract class RedirectHandler<T extends RedirectContext> {
 
       context.setResult(brokerBalancer.getTarget(transportConnection, context.getClientID(), context.getUsername()));
 
-      if (TargetResult.Status.OK != context.getResult().status) {
+      if (TargetResult.Status.OK != context.getResult().getStatus()) {
          ActiveMQServerLogger.LOGGER.cannotRedirectClientConnection(transportConnection);
 
          cannotRedirect(context);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/TargetResult.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/TargetResult.java
index 1be2ff9..cd161c4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/TargetResult.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/TargetResult.java
@@ -22,16 +22,25 @@ public class TargetResult {
    public static final TargetResult REFUSED_UNAVAILABLE_RESULT = new TargetResult(Status.REFUSED_UNAVAILABLE);
    public static final TargetResult REFUSED_USE_ANOTHER_RESULT = new TargetResult(Status.REFUSED_USE_ANOTHER);
 
-   public Status status;
-   public Target target;
+   private final Status status;
+   private final Target target;
 
-   public TargetResult(Target t) {
-      this.target = t;
+   public Status getStatus() {
+      return status;
+   }
+
+   public Target getTarget() {
+      return target;
+   }
+
+   public TargetResult(Target target) {
       this.status = Status.OK;
+      this.target = target;
    }
 
    private TargetResult(Status s) {
       this.status = s;
+      this.target = null;
    }
 
    public enum Status {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index c3f694f..be7a540 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -1676,6 +1676,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
                                       final Map<SimpleString, RoutingType> prefixes,
                                       final String securityDomain,
                                       String validatedUser) throws Exception {
+      if (validatedUser == null) {
+         validatedUser = validateUser(username, password, connection, securityDomain);
+      }
 
       checkSessionLimit(validatedUser);
 
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerTest.java
index f41aca7..7f8ca0e 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerTest.java
@@ -74,7 +74,7 @@ public class BrokerBalancerTest {
 
    @Test
    public void getTarget() {
-      assertEquals( localTarget, underTest.getTarget("FOO_EE").target);
+      assertEquals( localTarget, underTest.getTarget("FOO_EE").getTarget());
       assertEquals(TargetResult.REFUSED_USE_ANOTHER_RESULT, underTest.getTarget("BAR_EE"));
    }