You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by br...@apache.org on 2021/11/24 10:28:25 UTC
[activemq-artemis] 01/02: ARTEMIS-3569 - balancer role_name local target, matches role of authenticated user
This is an automated email from the ASF dual-hosted git repository.
brusdev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit b9791891876e0608aaec0750c3c923b53569edbe
Author: gtully <ga...@gmail.com>
AuthorDate: Tue Nov 2 12:12:10 2021 +0000
ARTEMIS-3569 - balancer role_name local target, matches role of authenticated user
---
.../protocol/amqp/broker/AMQPSessionCallback.java | 3 +-
.../protocol/amqp/proton/AMQPRedirectHandler.java | 13 +++-
.../artemis/core/protocol/mqtt/MQTTConnection.java | 1 +
.../core/protocol/mqtt/MQTTConnectionManager.java | 15 ++---
.../core/protocol/mqtt/MQTTProtocolHandler.java | 6 +-
.../core/protocol/mqtt/MQTTRedirectHandler.java | 13 +++-
.../core/protocol/openwire/OpenWireConnection.java | 24 ++++++--
.../protocol/openwire/OpenWireProtocolManager.java | 10 ++--
.../core/protocol/openwire/amq/AMQSession.java | 2 +-
.../openwire/amq/OpenWireConnectionTest.java | 2 +-
.../core/protocol/stomp/StompProtocolManager.java | 3 +-
.../core/management/impl/AbstractControl.java | 3 +-
.../management/impl/BrokerBalancerControlImpl.java | 29 ++++-----
.../protocol/core/impl/ActiveMQPacketHandler.java | 11 ++--
.../core/impl/ActiveMQRedirectHandler.java | 7 ++-
.../core/security/impl/SecurityStoreImpl.java | 2 +-
.../artemis/core/server/ActiveMQMessageBundle.java | 4 ++
.../artemis/core/server/ActiveMQServer.java | 6 +-
.../core/server/balancing/BrokerBalancer.java | 47 ++++++++-------
.../core/server/balancing/RedirectContext.java | 13 ++--
.../core/server/balancing/RedirectHandler.java | 5 +-
.../core/server/balancing/targets/Target.java | 1 +
.../core/server/balancing/targets/TargetKey.java | 4 +-
.../balancing/targets/TargetKeyResolver.java | 32 ++++++++++
.../targets/{TargetKey.java => TargetResult.java} | 41 +++++--------
.../core/server/impl/ActiveMQServerImpl.java | 18 ++++--
.../core/server/balancing/BrokerBalancerTest.java | 6 +-
.../balancing/targets/TargetKeyResolverTest.java | 69 ++++++++++++++++++++++
docs/user-manual/en/broker-balancers.md | 9 +--
pom.xml | 2 +-
.../integration/balancing/AmqpRedirectTest.java | 58 ++++++++++++++++++
.../integration/balancing/BalancingTestBase.java | 28 ++++++++-
.../integration/balancing/MQTTRedirectTest.java | 59 +++++++++++++++---
.../tests/integration/balancing/TargetKeyTest.java | 56 ++++++++++++++++++
34 files changed, 466 insertions(+), 136 deletions(-)
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 2f5fa01..afaa281 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -211,12 +211,13 @@ public class AMQPSessionCallback implements SessionCallback {
true, //boolean xa,
(String) null, this, true, operationContext, manager.getPrefixes(), manager.getSecurityDomain());
} else {
+ final String validatedUser = manager.getServer().validateUser(user, passcode, protonSPI.getProtonConnectionDelegate(), manager.getSecurityDomain());
serverSession = manager.getServer().createSession(name, user, passcode, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, protonSPI.getProtonConnectionDelegate(), // RemotingConnection remotingConnection,
false, // boolean autoCommitSends
false, // boolean autoCommitAcks,
false, // boolean preAcknowledge,
true, //boolean xa,
- (String) null, this, true, operationContext, manager.getPrefixes(), manager.getSecurityDomain());
+ (String) null, this, true, operationContext, manager.getPrefixes(), manager.getSecurityDomain(), validatedUser);
}
}
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPRedirectHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPRedirectHandler.java
index bab287f..31752a8 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPRedirectHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPRedirectHandler.java
@@ -41,10 +41,17 @@ public class AMQPRedirectHandler extends RedirectHandler<AMQPRedirectContext> {
}
@Override
- protected void cannotRedirect(AMQPRedirectContext context) throws Exception {
+ protected void cannotRedirect(AMQPRedirectContext context) {
ErrorCondition error = new ErrorCondition();
error.setCondition(ConnectionError.CONNECTION_FORCED);
- error.setDescription(String.format("Broker balancer %s is not ready to redirect", context.getConnection().getTransportConnection().getRedirectTo()));
+ switch (context.getResult().status) {
+ case REFUSED_USE_ANOTHER:
+ error.setDescription(String.format("Broker balancer %s, rejected this connection", context.getConnection().getTransportConnection().getRedirectTo()));
+ break;
+ case REFUSED_UNAVAILABLE:
+ error.setDescription(String.format("Broker balancer %s is not ready to redirect", context.getConnection().getTransportConnection().getRedirectTo()));
+ break;
+ }
Connection protonConnection = context.getProtonConnection();
protonConnection.setCondition(error);
@@ -52,7 +59,7 @@ public class AMQPRedirectHandler extends RedirectHandler<AMQPRedirectContext> {
}
@Override
- protected void redirectTo(AMQPRedirectContext context) throws Exception {
+ protected void redirectTo(AMQPRedirectContext context) {
String host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, context.getTarget().getConnector().getParams());
int port = ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT, context.getTarget().getConnector().getParams());
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
index fbc6d8a..66cc1c9 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
@@ -59,6 +59,7 @@ public class MQTTConnection implements RemotingConnection {
this.creationTime = System.currentTimeMillis();
this.dataReceived = new AtomicBoolean();
this.destroyed = false;
+ transportConnection.setProtocolConnection(this);
}
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
index ab03c29..8bf4d5e 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
@@ -23,7 +23,6 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttProperties;
-import io.netty.util.CharsetUtil;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ServerSession;
@@ -61,13 +60,13 @@ public class MQTTConnectionManager {
*/
void connect(String cId,
String username,
- byte[] passwordInBytes,
+ String password,
boolean will,
byte[] willMessage,
String willTopic,
boolean willRetain,
int willQosLevel,
- boolean cleanSession) throws Exception {
+ boolean cleanSession, String validatedUser) throws Exception {
String clientId = validateClientId(cId, cleanSession);
if (clientId == null) {
session.getProtocolHandler().sendConnack(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
@@ -79,11 +78,10 @@ public class MQTTConnectionManager {
MQTTSessionState sessionState = getSessionState(clientId);
synchronized (sessionState) {
session.setSessionState(sessionState);
- String password = passwordInBytes == null ? null : new String(passwordInBytes, CharsetUtil.UTF_8);
session.getConnection().setClientID(clientId);
- ServerSessionImpl serverSession = createServerSession(username, password);
+ ServerSessionImpl serverSession = createServerSession(username, password, validatedUser);
serverSession.start();
- ServerSessionImpl internalServerSession = createServerSession(username, password);
+ ServerSessionImpl internalServerSession = createServerSession(username, password, validatedUser);
internalServerSession.disableSecurity();
internalServerSession.start();
session.setServerSession(serverSession, internalServerSession);
@@ -120,10 +118,9 @@ public class MQTTConnectionManager {
* @return
* @throws Exception
*/
- ServerSessionImpl createServerSession(String username, String password) throws Exception {
+ ServerSessionImpl createServerSession(String username, String password, String validatedUser) throws Exception {
String id = UUIDGenerator.getInstance().generateStringUUID();
ActiveMQServer server = session.getServer();
-
ServerSession serverSession = server.createSession(id,
username,
password,
@@ -138,7 +135,7 @@ public class MQTTConnectionManager {
MQTTUtil.SESSION_AUTO_CREATE_QUEUE,
server.newOperationContext(),
session.getProtocolManager().getPrefixes(),
- session.getProtocolManager().getSecurityDomain());
+ session.getProtocolManager().getSecurityDomain(), validatedUser);
return (ServerSessionImpl) serverSession;
}
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
index f33c5de..e7f343d 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
@@ -38,6 +38,7 @@ import io.netty.handler.codec.mqtt.MqttSubAckPayload;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
+import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.logs.AuditLogger;
@@ -177,12 +178,15 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
* @param connect
*/
void handleConnect(MqttConnectMessage connect) throws Exception {
+ final String username = connect.payload().userName();
+ final String password = connect.payload().passwordInBytes() == null ? null : new String( connect.payload().passwordInBytes(), CharsetUtil.UTF_8);
+ final String validatedUser = server.validateUser(username, password, session.getConnection(), session.getProtocolManager().getSecurityDomain());
if (connection.getTransportConnection().getRedirectTo() == null ||
!protocolManager.getRedirectHandler().redirect(connection, session, connect)) {
connectionEntry.ttl = connect.variableHeader().keepAliveTimeSeconds() * 1500L;
String clientId = connect.payload().clientIdentifier();
- session.getConnectionManager().connect(clientId, connect.payload().userName(), connect.payload().passwordInBytes(), connect.variableHeader().isWillFlag(), connect.payload().willMessageInBytes(), connect.payload().willTopic(), connect.variableHeader().isWillRetain(), connect.variableHeader().willQos(), connect.variableHeader().isCleanSession());
+ session.getConnectionManager().connect(clientId, username, password, connect.variableHeader().isWillFlag(), connect.payload().willMessageInBytes(), connect.payload().willTopic(), connect.variableHeader().isWillRetain(), connect.variableHeader().willQos(), connect.variableHeader().isCleanSession(), validatedUser);
}
}
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRedirectHandler.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRedirectHandler.java
index 3b37203..15dcc4a 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRedirectHandler.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRedirectHandler.java
@@ -36,13 +36,20 @@ public class MQTTRedirectHandler extends RedirectHandler<MQTTRedirectContext> {
}
@Override
- protected void cannotRedirect(MQTTRedirectContext context) throws Exception {
- context.getMQTTSession().getProtocolHandler().sendConnack(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
+ protected void cannotRedirect(MQTTRedirectContext context) {
+ switch (context.getResult().status) {
+ case REFUSED_USE_ANOTHER:
+ context.getMQTTSession().getProtocolHandler().sendConnack(MqttConnectReturnCode.CONNECTION_REFUSED_USE_ANOTHER_SERVER);
+ break;
+ case REFUSED_UNAVAILABLE:
+ context.getMQTTSession().getProtocolHandler().sendConnack(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
+ break;
+ }
context.getMQTTSession().getProtocolHandler().disconnect(true);
}
@Override
- protected void redirectTo(MQTTRedirectContext context) throws Exception {
+ protected void redirectTo(MQTTRedirectContext context) {
String host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, context.getTarget().getConnector().getParams());
int port = ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT, context.getTarget().getConnector().getParams());
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 9e8dee7..c0c2a2f 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -197,6 +197,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
private final AtomicBoolean disableTtl = new AtomicBoolean(false);
+ private String validatedUser = null;
+
public OpenWireConnection(Connection connection,
ActiveMQServer server,
OpenWireProtocolManager openWireProtocolManager,
@@ -210,6 +212,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
this.outWireFormat = wf.copy();
this.useKeepAlive = openWireProtocolManager.isUseKeepAlive();
this.maxInactivityDuration = openWireProtocolManager.getMaxInactivityDuration();
+ this.transportConnection.setProtocolConnection(this);
}
// SecurityAuth implementation
@@ -768,7 +771,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
private void createInternalSession(ConnectionInfo info) throws Exception {
- internalSession = server.createSession(UUIDGenerator.getInstance().generateStringUUID(), context.getUserName(), info.getPassword(), -1, this, true, false, false, false, null, null, true, operationContext, protocolManager.getPrefixes(), protocolManager.getSecurityDomain());
+ internalSession = server.createSession(UUIDGenerator.getInstance().generateStringUUID(), context.getUserName(), info.getPassword(), -1, this, true, false, false, false, null, null, true, operationContext, protocolManager.getPrefixes(), protocolManager.getSecurityDomain(), validatedUser);
}
//raise the refCount of context
@@ -1010,6 +1013,14 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
return protocolManager.isSupportAdvisory();
}
+ public String getValidatedUser() {
+ return validatedUser;
+ }
+
+ public void setValidatedUser(String validatedUser) {
+ this.validatedUser = validatedUser;
+ }
+
class SlowConsumerDetection implements SlowConsumerDetectionListener {
@Override
@@ -1126,12 +1137,13 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
@Override
public Response processAddConnection(ConnectionInfo info) throws Exception {
try {
- if (transportConnection.getRedirectTo() != null && protocolManager.getRedirectHandler()
- .redirect(OpenWireConnection.this, info)) {
- shutdown(true);
- return null;
+ protocolManager.validateUser(OpenWireConnection.this, info);
+ if (transportConnection.getRedirectTo() != null) {
+ if (protocolManager.getRedirectHandler().redirect(OpenWireConnection.this, info)) {
+ shutdown(true);
+ return null;
+ }
}
-
protocolManager.addConnection(OpenWireConnection.this, info);
} catch (Exception e) {
Response resp = new ExceptionResponse(e);
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index 0ce2093..8433c9c 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -357,19 +357,21 @@ public class OpenWireProtocolManager extends AbstractProtocolManager<Command, O
return websocketRegistryNames;
}
- public void addConnection(OpenWireConnection connection, ConnectionInfo info) throws Exception {
+ public void validateUser(OpenWireConnection connection, ConnectionInfo info) throws Exception {
String username = info.getUserName();
String password = info.getPassword();
try {
- validateUser(username, password, connection);
+ connection.setValidatedUser(validateUser(username, password, connection));
} catch (ActiveMQSecurityException e) {
// We need to send an exception used by the openwire
SecurityException ex = new SecurityException("User name [" + username + "] or password is invalid.");
ex.initCause(e);
throw ex;
}
+ }
+ public void addConnection(OpenWireConnection connection, ConnectionInfo info) throws Exception {
String clientId = info.getClientId();
if (clientId == null) {
throw new InvalidClientIDException("No clientID specified for connection request");
@@ -529,8 +531,8 @@ public class OpenWireProtocolManager extends AbstractProtocolManager<Command, O
return false;
}
- public void validateUser(String login, String passcode, OpenWireConnection connection) throws Exception {
- server.getSecurityStore().authenticate(login, passcode, connection, getSecurityDomain());
+ public String validateUser(String login, String passcode, OpenWireConnection connection) throws Exception {
+ return server.validateUser(login, passcode, connection, getSecurityDomain());
}
public void sendBrokerInfo(OpenWireConnection connection) throws Exception {
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index e3e0b0e..af6e04a 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -133,7 +133,7 @@ public class AMQSession implements SessionCallback {
// now
try {
- coreSession = server.createSession(name, username, password, minLargeMessageSize, connection, true, false, false, false, null, this, true, connection.getOperationContext(), protocolManager.getPrefixes(), protocolManager.getSecurityDomain());
+ coreSession = server.createSession(name, username, password, minLargeMessageSize, connection, true, false, false, false, null, this, true, connection.getOperationContext(), protocolManager.getPrefixes(), protocolManager.getSecurityDomain(), connection.getValidatedUser());
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.error("error init session", e);
}
diff --git a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/amq/OpenWireConnectionTest.java b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/amq/OpenWireConnectionTest.java
index c64ecc1..1ad11cd 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/amq/OpenWireConnectionTest.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/amq/OpenWireConnectionTest.java
@@ -72,7 +72,7 @@ public class OpenWireConnectionTest {
ServerSession serverSession = Mockito.mock(ServerSession.class);
Mockito.when(serverSession.getName()).thenReturn("session");
Mockito.doReturn(serverSession).when(server).createSession(Mockito.anyString(), Mockito.any(), Mockito.any(), Mockito.anyInt(), Mockito.any(), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyBoolean(),
- Mockito.anyBoolean(), Mockito.any(), Mockito.any(), Mockito.anyBoolean(), Mockito.any(), Mockito.any(), Mockito.anyString());
+ Mockito.anyBoolean(), Mockito.any(), Mockito.any(), Mockito.anyBoolean(), Mockito.any(), Mockito.any(), Mockito.anyString(), Mockito.any());
OpenWireProtocolManager openWireProtocolManager = new OpenWireProtocolManager(null, server,null, null);
openWireProtocolManager.setSecurityDomain("securityDomain");
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
index 46e5eea..754d207 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
@@ -230,7 +230,8 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
if (stompSession == null) {
stompSession = new StompSession(connection, this, server.getStorageManager().newContext(server.getExecutorFactory().getExecutor()));
String name = UUIDGenerator.getInstance().generateStringUUID();
- ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, !transacted, false, false, false, null, stompSession, true, server.newOperationContext(), getPrefixes(), getSecurityDomain());
+ final String validatedUser = server.validateUser(connection.getLogin(), connection.getPasscode(), connection, getSecurityDomain());
+ ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, !transacted, false, false, false, null, stompSession, true, server.newOperationContext(), getPrefixes(), getSecurityDomain(), validatedUser);
stompSession.setServerSession(session);
sessions.put(id, stompSession);
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AbstractControl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AbstractControl.java
index cb319a2..98e966e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AbstractControl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AbstractControl.java
@@ -117,11 +117,12 @@ public abstract class AbstractControl extends StandardMBean {
boolean createMessageId,
Long...queueID) throws Exception {
ManagementRemotingConnection fakeConnection = new ManagementRemotingConnection();
+ final String validatedUser = server.validateUser(user, password, fakeConnection, null);
ServerSession serverSession = server.createSession("management::" + UUIDGenerator.getInstance().generateStringUUID(), user, password,
Integer.MAX_VALUE, fakeConnection,
true, true, false,
false, address.toString(), fakeConnection.callback,
- false, new DummyOperationContext(), Collections.emptyMap(), null);
+ false, new DummyOperationContext(), Collections.emptyMap(), null, validatedUser);
try {
CoreMessage message = new CoreMessage(storageManager.generateID(), 50);
if (headers != null) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/BrokerBalancerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/BrokerBalancerControlImpl.java
index 298c956..3747cb7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/BrokerBalancerControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/BrokerBalancerControlImpl.java
@@ -22,6 +22,7 @@ import org.apache.activemq.artemis.api.core.management.BrokerBalancerControl;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.balancing.BrokerBalancer;
import org.apache.activemq.artemis.core.server.balancing.targets.Target;
+import org.apache.activemq.artemis.core.server.balancing.targets.TargetResult;
import org.apache.activemq.artemis.utils.JsonLoader;
import org.apache.activemq.artemis.json.JsonObjectBuilder;
@@ -59,17 +60,16 @@ public class BrokerBalancerControlImpl extends AbstractControl implements Broker
@Override
public CompositeData getTarget(String key) throws Exception {
- Target target = balancer.getTarget(key);
-
- if (target != null) {
+ TargetResult result = balancer.getTarget(key);
+ if (TargetResult.Status.OK == result.status) {
CompositeData connectorData = null;
- TransportConfiguration connector = target.getConnector();
+ TransportConfiguration connector = result.target.getConnector();
if (connector != null) {
TabularData paramsData = new TabularDataSupport(getParametersType());
for (Map.Entry<String, Object> param : connector.getParams().entrySet()) {
paramsData.put(new CompositeDataSupport(getParameterType(), new String[]{"key", "value"},
- new Object[]{param.getKey(), param == null ? param : param.getValue().toString()}));
+ new Object[]{param.getKey(), param.getValue() == null ? null : param.getValue().toString()}));
}
connectorData = new CompositeDataSupport(getTransportConfigurationType(),
@@ -77,11 +77,9 @@ public class BrokerBalancerControlImpl extends AbstractControl implements Broker
new Object[]{connector.getName(), connector.getFactoryClassName(), paramsData});
}
- CompositeData targetData = new CompositeDataSupport(getTargetCompositeType(),
- new String[]{"nodeID", "local", "connector"},
- new Object[]{target.getNodeID(), target.isLocal(), connectorData});
-
- return targetData;
+ return new CompositeDataSupport(getTargetCompositeType(),
+ new String[]{"nodeID", "local", "connector"},
+ new Object[]{result.target.getNodeID(), result.target.isLocal(), connectorData});
}
return null;
@@ -89,14 +87,13 @@ public class BrokerBalancerControlImpl extends AbstractControl implements Broker
@Override
public String getTargetAsJSON(String key) {
- Target target = balancer.getTarget(key);
-
- if (target != null) {
- TransportConfiguration connector = target.getConnector();
+ TargetResult result = balancer.getTarget(key);
+ if (TargetResult.Status.OK == result.status) {
+ TransportConfiguration connector = result.target.getConnector();
JsonObjectBuilder targetDataBuilder = JsonLoader.createObjectBuilder()
- .add("nodeID", target.getNodeID())
- .add("local", target.isLocal());
+ .add("nodeID", result.target.getNodeID())
+ .add("local", result.target.isLocal());
if (connector == null) {
targetDataBuilder.addNull("connector");
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
index e63dbf5..a2322a9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
@@ -163,10 +163,6 @@ public class ActiveMQPacketHandler implements ChannelHandler {
connection.setClientID(((CreateSessionMessage_V2) request).getClientID());
}
- if (connection.getTransportConnection().getRedirectTo() != null) {
- protocolManager.getRedirectHandler().redirect(connection, request);
- }
-
Channel channel = connection.getChannel(request.getSessionChannelID(), request.getWindowSize());
ActiveMQPrincipal activeMQPrincipal = null;
@@ -175,12 +171,17 @@ public class ActiveMQPacketHandler implements ChannelHandler {
activeMQPrincipal = connection.getDefaultActiveMQPrincipal();
}
+ final String validatedUser = server.validateUser(activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), connection, protocolManager.getSecurityDomain());
+ if (connection.getTransportConnection().getRedirectTo() != null) {
+ protocolManager.getRedirectHandler().redirect(connection, request);
+ }
+
OperationContext sessionOperationContext = server.newOperationContext();
Map<SimpleString, RoutingType> routingTypeMap = protocolManager.getPrefixes();
CoreSessionCallback sessionCallback = new CoreSessionCallback(request.getName(), protocolManager, channel, connection);
- ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), sessionCallback, true, sessionOperationContext, routingTypeMap, protocolM [...]
+ ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), sessionCallback, true, sessionOperationContext, routingTypeMap, protocolM [...]
ServerProducer serverProducer = new ServerProducerImpl(session.getName(), "CORE", request.getDefaultAddress());
session.addProducer(serverProducer);
ServerSessionPacketHandler handler = new ServerSessionPacketHandler(server, session, channel);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQRedirectHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQRedirectHandler.java
index 937bd27..9be389c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQRedirectHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQRedirectHandler.java
@@ -40,7 +40,12 @@ public class ActiveMQRedirectHandler extends RedirectHandler<ActiveMQRedirectCon
@Override
public void cannotRedirect(ActiveMQRedirectContext context) throws Exception {
- throw ActiveMQMessageBundle.BUNDLE.cannotRedirect();
+ switch (context.getResult().status) {
+ case REFUSED_UNAVAILABLE:
+ throw ActiveMQMessageBundle.BUNDLE.cannotRedirect();
+ case REFUSED_USE_ANOTHER:
+ throw ActiveMQMessageBundle.BUNDLE.balancerReject();
+ }
}
@Override
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/security/impl/SecurityStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/security/impl/SecurityStoreImpl.java
index 188692a..39257f0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/security/impl/SecurityStoreImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/security/impl/SecurityStoreImpl.java
@@ -190,7 +190,7 @@ public class SecurityStoreImpl implements SecurityStore, HierarchicalRepositoryC
authenticationFailed(user, connection);
}
- if (AuditLogger.isAnyLoggingEnabled() && connection != null) {
+ if (connection != null) {
connection.setAuditSubject(subject);
}
if (AuditLogger.isResourceLoggingEnabled()) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
index 454d5fe..454c44c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
@@ -40,6 +40,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached;
import org.apache.activemq.artemis.api.core.ActiveMQRedirectedException;
+import org.apache.activemq.artemis.api.core.ActiveMQRemoteDisconnectException;
import org.apache.activemq.artemis.api.core.ActiveMQReplicationTimeooutException;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.api.core.ActiveMQSessionCreationException;
@@ -518,4 +519,7 @@ public interface ActiveMQMessageBundle {
@Message(id = 229239, value = "There is no retention configured. In order to use the replay method you must specify journal-retention-directory element on the broker.xml")
IllegalArgumentException noRetention();
+
+ @Message(id = 229240, value = "Balancer rejected the connection")
+ ActiveMQRemoteDisconnectException balancerReject();
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index 7f551d4..35d4f63 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -338,7 +338,7 @@ public interface ActiveMQServer extends ServiceComponent {
boolean autoCreateQueues,
OperationContext context,
Map<SimpleString, RoutingType> prefixes,
- String securityDomain) throws Exception;
+ String securityDomain, String validatedUser) throws Exception;
/** This is to be used in places where security is bypassed, like internal sessions, broker connections, etc... */
ServerSession createInternalSession(String name,
@@ -959,4 +959,6 @@ public interface ActiveMQServer extends ServiceComponent {
void reloadConfigurationFile() throws Exception;
BrokerBalancerManager getBalancerManager();
-}
+
+ String validateUser(String username, String password, RemotingConnection connection, String securityDomain) throws Exception;
+}
\ No newline at end of file
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java
index 512056f..2aa69db 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java
@@ -26,6 +26,7 @@ import org.apache.activemq.artemis.core.server.balancing.pools.Pool;
import org.apache.activemq.artemis.core.server.balancing.targets.Target;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetKeyResolver;
+import org.apache.activemq.artemis.core.server.balancing.targets.TargetResult;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.jboss.logging.Logger;
@@ -46,7 +47,7 @@ public class BrokerBalancer implements ActiveMQComponent {
private final TargetKeyResolver targetKeyResolver;
- private final Target localTarget;
+ private final TargetResult localTarget;
private final Pattern localTargetFilter;
@@ -54,7 +55,7 @@ public class BrokerBalancer implements ActiveMQComponent {
private final Policy policy;
- private final Cache<String, Target> cache;
+ private final Cache<String, TargetResult> cache;
private volatile boolean started = false;
@@ -67,7 +68,7 @@ public class BrokerBalancer implements ActiveMQComponent {
}
public Target getLocalTarget() {
- return localTarget;
+ return localTarget.target;
}
public String getLocalTargetFilter() {
@@ -82,7 +83,7 @@ public class BrokerBalancer implements ActiveMQComponent {
return policy;
}
- public Cache<String, Target> getCache() {
+ public Cache<String, TargetResult> getCache() {
return cache;
}
@@ -99,7 +100,7 @@ public class BrokerBalancer implements ActiveMQComponent {
this.targetKeyResolver = new TargetKeyResolver(targetKey, targetKeyFilter);
- this.localTarget = localTarget;
+ this.localTarget = new TargetResult(localTarget);
this.localTargetFilter = localTargetFilter != null ? Pattern.compile(localTargetFilter) : null;
@@ -134,7 +135,7 @@ public class BrokerBalancer implements ActiveMQComponent {
}
}
- public Target getTarget(Connection connection, String clientID, String username) {
+ public TargetResult getTarget(Connection connection, String clientID, String username) {
if (clientID != null && clientID.startsWith(BrokerBalancer.CLIENT_ID_PREFIX)) {
if (logger.isDebugEnabled()) {
logger.debug("The clientID [" + clientID + "] starts with BrokerBalancer.CLIENT_ID_PREFIX");
@@ -146,7 +147,7 @@ public class BrokerBalancer implements ActiveMQComponent {
return getTarget(targetKeyResolver.resolve(connection, clientID, username));
}
- public Target getTarget(String key) {
+ public TargetResult getTarget(String key) {
if (this.localTargetFilter != null && this.localTargetFilter.matcher(key).matches()) {
if (logger.isDebugEnabled()) {
@@ -157,45 +158,47 @@ public class BrokerBalancer implements ActiveMQComponent {
}
if (pool == null) {
- return null;
+ return TargetResult.REFUSED_USE_ANOTHER_RESULT;
}
- Target target = null;
+ TargetResult result = null;
if (cache != null) {
- target = cache.getIfPresent(key);
+ result = cache.getIfPresent(key);
}
- if (target != null) {
- if (pool.isTargetReady(target)) {
+ if (result != null) {
+ if (pool.isTargetReady(result.target)) {
if (logger.isDebugEnabled()) {
- logger.debug("The cache returns [" + target + "] ready for " + targetKey + "[" + key + "]");
+ logger.debug("The cache returns [" + result.target + "] ready for " + targetKey + "[" + key + "]");
}
- return target;
+ return result;
}
if (logger.isDebugEnabled()) {
- logger.debug("The cache returns [" + target + "] not ready for " + targetKey + "[" + key + "]");
+ logger.debug("The cache returns [" + result.target + "] not ready for " + targetKey + "[" + key + "]");
}
}
List<Target> targets = pool.getTargets();
- target = policy.selectTarget(targets, key);
+ Target target = policy.selectTarget(targets, key);
if (logger.isDebugEnabled()) {
logger.debug("The policy selects [" + target + "] from " + targets + " for " + targetKey + "[" + key + "]");
}
- if (target != null && cache != null) {
- if (logger.isDebugEnabled()) {
- logger.debug("Caching " + targetKey + "[" + key + "] for [" + target + "]");
+ if (target != null) {
+ result = new TargetResult(target);
+ if (cache != null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Caching " + targetKey + "[" + key + "] for [" + target + "]");
+ }
+ cache.put(key, result);
}
-
- cache.put(key, target);
}
- return target;
+ return result != null ? result : TargetResult.REFUSED_UNAVAILABLE_RESULT;
}
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/RedirectContext.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/RedirectContext.java
index 76a2a54..5534a56 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/RedirectContext.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/RedirectContext.java
@@ -18,6 +18,7 @@
package org.apache.activemq.artemis.core.server.balancing;
import org.apache.activemq.artemis.core.server.balancing.targets.Target;
+import org.apache.activemq.artemis.core.server.balancing.targets.TargetResult;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
public class RedirectContext {
@@ -27,7 +28,7 @@ public class RedirectContext {
private final String username;
- private Target target;
+ private TargetResult result;
public RemotingConnection getConnection() {
return connection;
@@ -42,11 +43,15 @@ public class RedirectContext {
}
public Target getTarget() {
- return target;
+ return result.target;
}
- public void setTarget(Target target) {
- this.target = target;
+ public TargetResult getResult() {
+ return result;
+ }
+
+ public void setResult(TargetResult result) {
+ this.result = result;
}
public RedirectContext(RemotingConnection connection, String clientID, String username) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/RedirectHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/RedirectHandler.java
index 89ace13..967e686 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/RedirectHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/RedirectHandler.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.server.balancing;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.balancing.targets.TargetResult;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
public abstract class RedirectHandler<T extends RedirectContext> {
@@ -51,9 +52,9 @@ public abstract class RedirectHandler<T extends RedirectContext> {
return true;
}
- context.setTarget(brokerBalancer.getTarget(transportConnection, context.getClientID(), context.getUsername()));
+ context.setResult(brokerBalancer.getTarget(transportConnection, context.getClientID(), context.getUsername()));
- if (context.getTarget() == null) {
+ if (TargetResult.Status.OK != context.getResult().status) {
ActiveMQServerLogger.LOGGER.cannotRedirectClientConnection(transportConnection);
cannotRedirect(context);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/Target.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/Target.java
index f82331b..fb09d87 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/Target.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/Target.java
@@ -56,4 +56,5 @@ public interface Target {
<T> T getAttribute(String resourceName, String attributeName, Class<T> attributeClass, int timeout) throws Exception;
<T> T invokeOperation(String resourceName, String operationName, Object[] operationParams, Class<T> operationClass, int timeout) throws Exception;
+
}
\ No newline at end of file
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/TargetKey.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/TargetKey.java
index d01b932..3493ec3 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/TargetKey.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/TargetKey.java
@@ -18,7 +18,7 @@
package org.apache.activemq.artemis.core.server.balancing.targets;
public enum TargetKey {
- CLIENT_ID, SNI_HOST, SOURCE_IP, USER_NAME;
+ CLIENT_ID, SNI_HOST, SOURCE_IP, USER_NAME, ROLE_NAME;
public static final String validValues;
@@ -46,6 +46,8 @@ public enum TargetKey {
return SOURCE_IP;
case "USER_NAME":
return USER_NAME;
+ case "ROLE_NAME":
+ return ROLE_NAME;
default:
throw new IllegalStateException("Invalid RedirectKey:" + type + " valid Types: " + validValues);
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/TargetKeyResolver.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/TargetKeyResolver.java
index dac82de..6a409d0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/TargetKeyResolver.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/TargetKeyResolver.java
@@ -17,7 +17,10 @@
package org.apache.activemq.artemis.core.server.balancing.targets;
+import javax.security.auth.Subject;
+
import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal;
import org.jboss.logging.Logger;
import java.util.regex.Matcher;
@@ -80,6 +83,35 @@ public class TargetKeyResolver {
case USER_NAME:
keyValue = username;
break;
+ case ROLE_NAME:
+ if (connection != null && connection.getProtocolConnection() != null) {
+ Subject subject = connection.getProtocolConnection().getAuditSubject();
+ if (subject != null) {
+ for (RolePrincipal candidateRole : subject.getPrincipals(RolePrincipal.class)) {
+ String roleName = candidateRole.getName();
+ if (roleName != null) {
+ if (keyFilter != null) {
+ Matcher keyMatcher = keyFilter.matcher(roleName);
+ if (keyMatcher.find()) {
+ keyValue = keyMatcher.group();
+ if (logger.isDebugEnabled()) {
+ logger.debugf("role match for %s via %s", roleName, keyMatcher);
+ }
+ return keyValue;
+ }
+ } else {
+ // with no filter, first role is the candidate
+ keyValue = roleName;
+ if (logger.isDebugEnabled()) {
+ logger.debugf("first role match: %s", roleName);
+ }
+ return keyValue;
+ }
+ }
+ }
+ }
+ }
+ break;
default:
throw new IllegalStateException("Unexpected value: " + key);
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/TargetKey.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/TargetResult.java
similarity index 51%
copy from artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/TargetKey.java
copy to artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/TargetResult.java
index d01b932..1be2ff9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/TargetKey.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/TargetResult.java
@@ -17,37 +17,26 @@
package org.apache.activemq.artemis.core.server.balancing.targets;
-public enum TargetKey {
- CLIENT_ID, SNI_HOST, SOURCE_IP, USER_NAME;
+public class TargetResult {
- public static final String validValues;
+ public static final TargetResult REFUSED_UNAVAILABLE_RESULT = new TargetResult(Status.REFUSED_UNAVAILABLE);
+ public static final TargetResult REFUSED_USE_ANOTHER_RESULT = new TargetResult(Status.REFUSED_USE_ANOTHER);
- static {
- StringBuffer stringBuffer = new StringBuffer();
- for (TargetKey type : TargetKey.values()) {
+ public Status status;
+ public Target target;
- if (stringBuffer.length() != 0) {
- stringBuffer.append(",");
- }
-
- stringBuffer.append(type.name());
- }
+ public TargetResult(Target t) {
+ this.target = t;
+ this.status = Status.OK;
+ }
- validValues = stringBuffer.toString();
+ private TargetResult(Status s) {
+ this.status = s;
}
- public static TargetKey getType(String type) {
- switch (type) {
- case "CLIENT_ID":
- return CLIENT_ID;
- case "SNI_HOST":
- return SNI_HOST;
- case "SOURCE_IP":
- return SOURCE_IP;
- case "USER_NAME":
- return USER_NAME;
- default:
- throw new IllegalStateException("Invalid RedirectKey:" + type + " valid Types: " + validValues);
- }
+ public enum Status {
+ OK,
+ REFUSED_UNAVAILABLE, // pool is not yet ready, possibly transient
+ REFUSED_USE_ANOTHER // rejected, go else where, non-transient
}
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 9cfbd34..c3f694f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -1674,12 +1674,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final boolean autoCreateQueues,
final OperationContext context,
final Map<SimpleString, RoutingType> prefixes,
- final String securityDomain) throws Exception {
- String validatedUser = "";
-
- if (securityStore != null) {
- validatedUser = securityStore.authenticate(username, password, connection, securityDomain);
- }
+ final String securityDomain,
+ String validatedUser) throws Exception {
checkSessionLimit(validatedUser);
@@ -1694,6 +1690,16 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
@Override
+ public String validateUser(String username, String password, RemotingConnection connection, String securityDomain) throws Exception {
+ String validatedUser = "";
+
+ if (securityStore != null) {
+ validatedUser = securityStore.authenticate(username, password, connection, securityDomain);
+ }
+ return validatedUser;
+ }
+
+ @Override
public ServerSession createInternalSession(String name,
int minLargeMessageSize,
RemotingConnection connection,
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerTest.java
index a59307d..f41aca7 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerTest.java
@@ -18,7 +18,6 @@
package org.apache.activemq.artemis.core.server.balancing;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
@@ -29,6 +28,7 @@ import org.apache.activemq.artemis.core.server.balancing.pools.Pool;
import org.apache.activemq.artemis.core.server.balancing.targets.LocalTarget;
import org.apache.activemq.artemis.core.server.balancing.targets.Target;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey;
+import org.apache.activemq.artemis.core.server.balancing.targets.TargetResult;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -74,8 +74,8 @@ public class BrokerBalancerTest {
@Test
public void getTarget() {
- assertEquals( localTarget, underTest.getTarget("FOO_EE"));
- assertNotEquals( localTarget, underTest.getTarget("BAR_EE"));
+ assertEquals( localTarget, underTest.getTarget("FOO_EE").target);
+ assertEquals(TargetResult.REFUSED_USE_ANOTHER_RESULT, underTest.getTarget("BAR_EE"));
}
}
\ No newline at end of file
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/targets/TargetKeyResolverTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/targets/TargetKeyResolverTest.java
index 5933696..df741f0 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/targets/TargetKeyResolverTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/targets/TargetKeyResolverTest.java
@@ -17,7 +17,15 @@
package org.apache.activemq.artemis.core.server.balancing.targets;
+import javax.security.auth.Subject;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal;
+import org.apache.commons.collections.set.ListOrderedSet;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
@@ -107,4 +115,65 @@ public class TargetKeyResolverTest {
Assert.assertEquals(TargetKeyResolver.DEFAULT_KEY_VALUE, targetKeyResolver.resolve(null, null, null));
}
+
+ @Test
+ public void testRoleNameKeyWithFilter() throws Exception {
+ TargetKeyResolver targetKeyResolver = new TargetKeyResolver(TargetKey.ROLE_NAME, "B");
+
+ Connection connection = Mockito.mock(Connection.class);
+ Assert.assertEquals(TargetKeyResolver.DEFAULT_KEY_VALUE, targetKeyResolver.resolve(connection, null, null));
+
+ RemotingConnection protocolConnection = Mockito.mock(RemotingConnection.class);
+ Mockito.when(connection.getProtocolConnection()).thenReturn(protocolConnection);
+ Subject subject = Mockito.mock(Subject.class);
+ Mockito.when(protocolConnection.getAuditSubject()).thenReturn(subject);
+
+ Assert.assertEquals(TargetKeyResolver.DEFAULT_KEY_VALUE, targetKeyResolver.resolve(connection, null, null));
+
+ Set<RolePrincipal> rolePrincipals = new HashSet<>();
+ Mockito.when(subject.getPrincipals(RolePrincipal.class)).thenReturn(rolePrincipals);
+
+ Assert.assertEquals(TargetKeyResolver.DEFAULT_KEY_VALUE, targetKeyResolver.resolve(connection, null, null));
+
+ rolePrincipals.add(new RolePrincipal("A"));
+
+ Assert.assertEquals(TargetKeyResolver.DEFAULT_KEY_VALUE, targetKeyResolver.resolve(connection, null, null));
+
+ rolePrincipals.add(new RolePrincipal("B"));
+
+ Assert.assertEquals("B", targetKeyResolver.resolve(connection, null, null));
+ }
+
+ @Test
+ public void testRoleNameKeyWithoutFilter() throws Exception {
+ TargetKeyResolver targetKeyResolver = new TargetKeyResolver(TargetKey.ROLE_NAME, null);
+
+ Connection connection = Mockito.mock(Connection.class);
+ Assert.assertEquals(TargetKeyResolver.DEFAULT_KEY_VALUE, targetKeyResolver.resolve(connection, null, null));
+
+ RemotingConnection protocolConnection = Mockito.mock(RemotingConnection.class);
+ Mockito.when(connection.getProtocolConnection()).thenReturn(protocolConnection);
+ Subject subject = Mockito.mock(Subject.class);
+ Mockito.when(protocolConnection.getAuditSubject()).thenReturn(subject);
+
+ Assert.assertEquals(TargetKeyResolver.DEFAULT_KEY_VALUE, targetKeyResolver.resolve(connection, null, null));
+
+ Set<RolePrincipal> rolePrincipals = new ListOrderedSet();
+ Mockito.when(subject.getPrincipals(RolePrincipal.class)).thenReturn(rolePrincipals);
+
+ Assert.assertEquals(TargetKeyResolver.DEFAULT_KEY_VALUE, targetKeyResolver.resolve(connection, null, null));
+
+ final RolePrincipal roleA = new RolePrincipal("A");
+ rolePrincipals.add(roleA);
+
+ Assert.assertEquals("A", targetKeyResolver.resolve(connection, null, null));
+
+ rolePrincipals.add(new RolePrincipal("B"));
+
+ Assert.assertEquals("A", targetKeyResolver.resolve(connection, null, null));
+
+ rolePrincipals.remove(roleA);
+ // with no filter, the first entry matches
+ Assert.assertEquals("B", targetKeyResolver.resolve(connection, null, null));
+ }
}
diff --git a/docs/user-manual/en/broker-balancers.md b/docs/user-manual/en/broker-balancers.md
index 4605d85..be42656 100644
--- a/docs/user-manual/en/broker-balancers.md
+++ b/docs/user-manual/en/broker-balancers.md
@@ -13,10 +13,11 @@ The remote target is another reachable broker.
## Target Key
The broker balancer uses a target key to select a target broker.
It is a string retrieved from an incoming client connection, the supported values are:
-* `CLIENT_ID` is the JMS client ID;
-* `SNI_HOST` is the hostname indicated by the client in the SNI extension of the TLS protocol;
-* `SOURCE_IP` is the source IP address of the client;
+* `CLIENT_ID` is the JMS client ID.
+* `SNI_HOST` is the hostname indicated by the client in the SNI extension of the TLS protocol.
+* `SOURCE_IP` is the source IP address of the client.
* `USER_NAME` is the username indicated by the client.
+* `ROLE_NAME` is a role associated with the authenticated user of the connection.
## Pools
The pool is a group of target brokers with periodic checks on their state.
@@ -108,7 +109,7 @@ for more details about setting the `cache-timeout` parameter.
## Defining broker balancers
A broker balancer is defined by the `broker-balancer` element, it includes the following items:
* the `name` attribute defines the name of the broker balancer and is used to reference the balancer from an acceptor;
-* the `target-key` element defines what key to select a target broker, the supported values are: `CLIENT_ID`, `SNI_HOST`, `SOURCE_IP`, `USER_NAME`, default is `SOURCE_IP`, see [target key](#target-key) for further details;
+* the `target-key` element defines what key to select a target broker, the supported values are: `CLIENT_ID`, `SNI_HOST`, `SOURCE_IP`, `USER_NAME`, `ROLE_NAME`, default is `SOURCE_IP`, see [target key](#target-key) for further details;
* the `target-key-filter` element defines a regular expression to filter the resolved keys;
* the `local-target-filter` element defines a regular expression to match the keys that have to return a local target;
* the `cache-timeout` element is the time period for a target broker to remain in the cache, measured in milliseconds, setting `0` will disable the cache, default is `-1`, meaning no expiration;
diff --git a/pom.xml b/pom.xml
index fa6438a..941778a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -207,7 +207,7 @@
<activemq-surefire-argline>-Dorg.apache.commons.logging.Log=org.apache.activemq.artemis.logs.JBossLoggingApacheLoggerBridge -Dorg.apache.activemq.artemis.utils.RetryRule.retry=${retryTests} -Dbrokerconfig.maxDiskUsage=100 -Dorg.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_QUIET_PERIOD=0 -Dorg.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_SHUTDOWN_TIMEOUT=0 -Djava.util.logging.manager=org.jboss.logmanager.LogManager
-Dlogging.configuration="file:${activemq.basedir}/tests/config/${logging.config}"
- -Djava.library.path="${activemq.basedir}/target/bin/lib/linux-x86_64:${activemq.basedir}/target/bin/lib/linux-i686" -Djgroups.bind_addr=localhost -Dorg.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory.localBindAddress=localhost
+ -Djava.library.path="${activemq.basedir}/target/bin/lib/linux-x86_64:${activemq.basedir}/target/bin/lib/linux-i686" -Djgroups.bind_addr=localhost
-Djava.net.preferIPv4Stack=true -Dbasedir=${basedir}
</activemq-surefire-argline>
<activemq.basedir>${project.basedir}</activemq.basedir>
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/AmqpRedirectTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/AmqpRedirectTest.java
index e376848..03b4101 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/AmqpRedirectTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/AmqpRedirectTest.java
@@ -189,4 +189,62 @@ public class AmqpRedirectTest extends BalancingTestBase {
stopServers(0, 1);
}
+
+ @Test
+ public void testBalancerRejectionUseAnother() throws Exception {
+ setupLiveServerWithDiscovery(0, GROUP_ADDRESS, GROUP_PORT, true, true, false);
+
+ // only accepts users with RoleName==B so will reject
+ setupBalancerServerWithLocalTarget(0, TargetKey.ROLE_NAME, "B", null);
+
+ startServers(0);
+
+ URI uri = new URI("tcp://localhost:" + TransportConstants.DEFAULT_PORT);
+ AmqpClient client = new AmqpClient(uri, "admin", "admin");
+
+ AmqpConnection connection = client.createConnection();
+ connection.setContainerId(getName());
+
+ connection.setStateInspector(new AmqpValidator() {
+
+ @Override
+ public void inspectOpenedResource(Connection connection) {
+ if (!connection.getRemoteProperties().containsKey(AmqpSupport.CONNECTION_OPEN_FAILED)) {
+ markAsInvalid("Broker did not set connection establishment failed hint");
+ }
+ }
+
+ @Override
+ public void inspectClosedResource(Connection connection) {
+ ErrorCondition remoteError = connection.getRemoteCondition();
+ if (remoteError == null || remoteError.getCondition() == null) {
+ markAsInvalid("Broker did not add error condition for connection");
+ return;
+ }
+
+ if (!remoteError.getCondition().equals(ConnectionError.CONNECTION_FORCED)) {
+ markAsInvalid("Broker did not set condition to " + ConnectionError.CONNECTION_FORCED);
+ return;
+ }
+ String expectedDescription = "Broker balancer " + BROKER_BALANCER_NAME + ", rejected this connection";
+ String actualDescription = remoteError.getDescription();
+ if (!expectedDescription.equals(actualDescription)) {
+ markAsInvalid("Broker did not set description as expected, was: " + actualDescription);
+ return;
+ }
+ }
+ });
+
+ try {
+ connection.connect();
+ fail("Expected connection to fail, without redirect");
+ } catch (Exception e) {
+ // Expected
+ }
+
+ connection.getStateInspector().assertValid();
+ connection.close();
+
+ stopServers(0);
+ }
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/BalancingTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/BalancingTestBase.java
index db7a849..890a813 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/BalancingTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/BalancingTestBase.java
@@ -137,7 +137,25 @@ public class BalancingTestBase extends ClusterTestBase {
acceptor.getParams().put("redirect-to", BROKER_BALANCER_NAME);
}
+
+ protected void setupBalancerServerWithLocalTarget(final int node, final TargetKey targetKey, final String targetKeyFilter, final String localTargetFilter) {
+
+ Configuration configuration = getServer(node).getConfiguration();
+ BrokerBalancerConfiguration brokerBalancerConfiguration = new BrokerBalancerConfiguration().setName(BROKER_BALANCER_NAME);
+ brokerBalancerConfiguration.setTargetKey(targetKey).setLocalTargetFilter(localTargetFilter).setTargetKeyFilter(targetKeyFilter);
+
+ configuration.setBalancerConfigurations(Collections.singletonList(brokerBalancerConfiguration));
+
+ TransportConfiguration acceptor = getDefaultServerAcceptor(node);
+ acceptor.getParams().put("redirect-to", BROKER_BALANCER_NAME);
+
+ }
+
protected ConnectionFactory createFactory(String protocol, boolean sslEnabled, String host, int port, String clientID, String user, String password) throws Exception {
+ return createFactory(protocol, sslEnabled, host, port, clientID, user, password, -1);
+ }
+
+ protected ConnectionFactory createFactory(String protocol, boolean sslEnabled, String host, int port, String clientID, String user, String password, int retries) throws Exception {
switch (protocol) {
case CORE_PROTOCOL: {
StringBuilder urlBuilder = new StringBuilder();
@@ -146,7 +164,7 @@ public class BalancingTestBase extends ClusterTestBase {
urlBuilder.append(host);
urlBuilder.append(":");
urlBuilder.append(port);
- urlBuilder.append("?ha=true&reconnectAttempts=30");
+ urlBuilder.append("?ha=true&reconnectAttempts=10&initialConnectAttempts=" + retries);
urlBuilder.append("&sniHost=");
urlBuilder.append(host);
@@ -197,8 +215,10 @@ public class BalancingTestBase extends ClusterTestBase {
urlBuilder.append(")");
}
+ urlBuilder.append("?failover.startupMaxReconnectAttempts=" + retries);
+
if (clientID != null) {
- urlBuilder.append("?jms.clientID=");
+ urlBuilder.append("&jms.clientID=");
urlBuilder.append(clientID);
}
@@ -223,8 +243,10 @@ public class BalancingTestBase extends ClusterTestBase {
urlBuilder.append(")");
}
+ urlBuilder.append("?startupMaxReconnectAttempts=" + retries + "&maxReconnectAttempts=" + retries);
+
if (clientID != null) {
- urlBuilder.append("?jms.clientID=");
+ urlBuilder.append("&jms.clientID=");
urlBuilder.append(clientID);
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/MQTTRedirectTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/MQTTRedirectTest.java
index acef93c..da8a17f 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/MQTTRedirectTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/MQTTRedirectTest.java
@@ -24,8 +24,11 @@ import org.apache.activemq.artemis.api.core.management.BrokerBalancerControl;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
+import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.balancing.policies.FirstElementPolicy;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey;
+import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
+import org.apache.activemq.artemis.tests.integration.security.SecurityTest;
import org.apache.activemq.artemis.utils.Wait;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
@@ -37,6 +40,8 @@ import org.junit.Test;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.TabularData;
+import java.lang.management.ManagementFactory;
+import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
@@ -44,7 +49,16 @@ import java.util.concurrent.TimeUnit;
public class MQTTRedirectTest extends BalancingTestBase {
- private final boolean discovery = true;
+ static {
+ String path = System.getProperty("java.security.auth.login.config");
+ if (path == null) {
+ URL resource = SecurityTest.class.getClassLoader().getResource("login.config");
+ if (resource != null) {
+ path = resource.getFile();
+ System.setProperty("java.security.auth.login.config", path);
+ }
+ }
+ }
@Test
public void testSimpleRedirect() throws Exception {
@@ -52,11 +66,7 @@ public class MQTTRedirectTest extends BalancingTestBase {
setupLiveServerWithDiscovery(0, GROUP_ADDRESS, GROUP_PORT, true, true, false);
setupLiveServerWithDiscovery(1, GROUP_ADDRESS, GROUP_PORT, true, true, false);
- if (discovery) {
- setupBalancerServerWithDiscovery(0, TargetKey.USER_NAME, FirstElementPolicy.NAME, null, false, null, 1);
- } else {
- setupBalancerServerWithStaticConnectors(0, TargetKey.USER_NAME, FirstElementPolicy.NAME, null, false, null, 1, 1);
- }
+ setupBalancerServerWithDiscovery(0, TargetKey.USER_NAME, FirstElementPolicy.NAME, null, false, null, 1);
startServers(0, 1);
@@ -94,7 +104,7 @@ public class MQTTRedirectTest extends BalancingTestBase {
CompositeData hostData = targetConnectorParams.get(new Object[]{TransportConstants.HOST_PROP_NAME});
CompositeData portData = targetConnectorParams.get(new Object[]{TransportConstants.PORT_PROP_NAME});
String host = hostData != null ? (String)hostData.get("value") : TransportConstants.DEFAULT_HOST;
- int port = portData != null ? Integer.valueOf((String)portData.get("value")) : TransportConstants.DEFAULT_PORT;
+ int port = portData != null ? Integer.parseInt((String)portData.get("value")) : TransportConstants.DEFAULT_PORT;
CountDownLatch latch = new CountDownLatch(1);
List<MqttMessage> messages = new ArrayList<>();
@@ -119,7 +129,40 @@ public class MQTTRedirectTest extends BalancingTestBase {
client1.close();
Assert.assertEquals(0, queueControl0.countMessages());
- Wait.assertEquals(0, () -> queueControl1.countMessages());
+ Wait.assertEquals(0, (Wait.LongCondition) queueControl1::countMessages);
+ }
+
+ @Test
+ public void testRoleNameKeyLocalTarget() throws Exception {
+
+ ActiveMQJAASSecurityManager securityManager = new ActiveMQJAASSecurityManager("PropertiesLogin");
+ servers[0] = addServer(ActiveMQServers.newActiveMQServer(createDefaultConfig(true).setSecurityEnabled(true), ManagementFactory.getPlatformMBeanServer(), securityManager, false));
+ setupBalancerServerWithLocalTarget(0, TargetKey.ROLE_NAME, "b", "b");
+
+ startServers(0);
+
+ MqttConnectOptions connOpts = new MqttConnectOptions();
+ connOpts.setCleanSession(true);
+ connOpts.setUserName("a");
+ connOpts.setPassword("a".toCharArray());
+
+ MqttClient client0 = new MqttClient("tcp://" + TransportConstants.DEFAULT_HOST + ":" + TransportConstants.DEFAULT_PORT, "TEST", new MemoryPersistence());
+ try {
+ client0.connect(connOpts);
+ fail("Expect to be rejected as not in role b");
+ } catch (MqttException e) {
+ Assert.assertEquals(MqttConnectReturnCode.CONNECTION_REFUSED_USE_ANOTHER_SERVER, MqttConnectReturnCode.valueOf((byte) e.getReasonCode()));
+ }
+ client0.close();
+
+ MqttClient client1 = new MqttClient("tcp://" + TransportConstants.DEFAULT_HOST + ":" + TransportConstants.DEFAULT_PORT, "TEST", new MemoryPersistence());
+ connOpts.setUserName("b");
+ connOpts.setPassword("b".toCharArray());
+
+ // expect to be accepted, b has role b
+ client1.connect(connOpts);
+ client1.disconnect();
+ client1.close();
}
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/TargetKeyTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/TargetKeyTest.java
index 901941b..5ccd8c9 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/TargetKeyTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/TargetKeyTest.java
@@ -18,12 +18,17 @@
package org.apache.activemq.artemis.tests.integration.balancing;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
+import org.apache.activemq.artemis.core.security.Role;
+import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.balancing.policies.FirstElementPolicy;
import org.apache.activemq.artemis.core.server.balancing.policies.Policy;
import org.apache.activemq.artemis.core.server.balancing.policies.PolicyFactory;
import org.apache.activemq.artemis.core.server.balancing.policies.PolicyFactoryResolver;
import org.apache.activemq.artemis.core.server.balancing.targets.Target;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey;
+import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
+import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
+import org.apache.activemq.artemis.tests.integration.security.SecurityTest;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
@@ -33,12 +38,16 @@ import org.junit.runners.Parameterized;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
+import java.lang.management.ManagementFactory;
import java.net.InetAddress;
+import java.net.URL;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
@RunWith(Parameterized.class)
public class TargetKeyTest extends BalancingTestBase {
@@ -57,6 +66,17 @@ public class TargetKeyTest extends BalancingTestBase {
}
+ static {
+ String path = System.getProperty("java.security.auth.login.config");
+ if (path == null) {
+ URL resource = SecurityTest.class.getClassLoader().getResource("login.config");
+ if (resource != null) {
+ path = resource.getFile();
+ System.setProperty("java.security.auth.login.config", path);
+ }
+ }
+ }
+
private final String protocol;
private final List<String> keys = new ArrayList<>();
@@ -174,6 +194,42 @@ public class TargetKeyTest extends BalancingTestBase {
Assert.assertEquals("admin", keys.get(0));
}
+ @Test
+ public void testRoleNameKeyLocalTarget() throws Exception {
+
+ ActiveMQJAASSecurityManager securityManager = new ActiveMQJAASSecurityManager("PropertiesLogin");
+ servers[0] = addServer(ActiveMQServers.newActiveMQServer(createDefaultConfig(true).setSecurityEnabled(true), ManagementFactory.getPlatformMBeanServer(), securityManager, false));
+ setupBalancerServerWithLocalTarget(0, TargetKey.ROLE_NAME, "b", "b");
+
+ // ensure advisory permission is present for openwire connection creation by 'b'
+ HierarchicalRepository<Set<Role>> securityRepository = servers[0].getSecurityRepository();
+ Role role = new Role("b", true, true, true, true, true, true, false, false, true, true);
+ Set<Role> roles = new HashSet<>();
+ roles.add(role);
+ securityRepository.addMatch("ActiveMQ.Advisory.#", roles);
+
+ startServers(0);
+
+ final int noRetriesSuchThatWeGetAnErrorOnRejection = 0;
+ ConnectionFactory connectionFactory = createFactory(protocol, false, TransportConstants.DEFAULT_HOST,
+ TransportConstants.DEFAULT_PORT + 0, null, "a", "a", noRetriesSuchThatWeGetAnErrorOnRejection);
+
+ // expect disconnect/reject as not role b
+ try (Connection connection = connectionFactory.createConnection()) {
+ connection.start();
+ fail("Expect to be rejected as not in role b");
+ } catch (Exception expectedButNotSpecificDueToDifferentProtocolsInPlay) {
+ }
+
+ connectionFactory = createFactory(protocol, false, TransportConstants.DEFAULT_HOST,
+ TransportConstants.DEFAULT_PORT + 0, null, "b", "b");
+
+ // expect to be accepted, b has role b
+ try (Connection connection = connectionFactory.createConnection()) {
+ connection.start();
+ }
+ }
+
private boolean checkLocalHostname(String host) {
try {
return InetAddress.getByName(host).isLoopbackAddress();