You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2017/10/09 12:30:14 UTC
[1/2] activemq-artemis git commit: ARTEMIS-1454: Support SASL in
outgoing AMQP
Repository: activemq-artemis
Updated Branches:
refs/heads/master 30ba65a08 -> 88e1fdc78
ARTEMIS-1454: Support SASL in outgoing AMQP
Update ProtonHandler to allow for both client and server side SASL
and other related changes to allow for setting of client side mechanism
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/cc8a0cb9
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/cc8a0cb9
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/cc8a0cb9
Branch: refs/heads/master
Commit: cc8a0cb90eba71aa595f0ed15814ae3bec326af2
Parents: 30ba65a
Author: Robert Godfrey <rg...@apache.org>
Authored: Wed Sep 27 23:45:14 2017 +0200
Committer: rgodfrey <rg...@apache.org>
Committed: Mon Oct 9 10:05:35 2017 +0200
----------------------------------------------------------------------
.../ActiveMQProtonRemotingConnection.java | 4 +
.../amqp/broker/ProtonProtocolManager.java | 2 +-
.../client/AMQPClientConnectionFactory.java | 6 +-
.../client/ProtonClientConnectionManager.java | 16 +-
.../amqp/proton/AMQPConnectionContext.java | 106 ++++++------
.../amqp/proton/handler/EventHandler.java | 62 ++++---
.../amqp/proton/handler/ProtonHandler.java | 169 +++++++++++++++----
.../artemis/protocol/amqp/sasl/ClientSASL.java | 23 +++
.../protocol/amqp/sasl/ClientSASLFactory.java | 21 +++
.../amqp/AmqpOutboundConnectionTest.java | 91 +++++++++-
10 files changed, 367 insertions(+), 133 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cc8a0cb9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
index c3533eb..fb6ca0a 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
@@ -175,4 +175,8 @@ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection
public String getClientID() {
return amqpConnection.getContainer();
}
+
+ public void open() {
+ amqpConnection.open();
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cc8a0cb9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
index 8f88d8f..cd35664 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
@@ -124,7 +124,7 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
String id = server.getConfiguration().getName();
boolean useCoreSubscriptionNaming = server.getConfiguration().isAmqpUseCoreSubscriptionNaming();
- AMQPConnectionContext amqpConnection = new AMQPConnectionContext(this, connectionCallback, id, (int) ttl, getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, useCoreSubscriptionNaming, server.getScheduledPool());
+ AMQPConnectionContext amqpConnection = new AMQPConnectionContext(this, connectionCallback, id, (int) ttl, getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, useCoreSubscriptionNaming, server.getScheduledPool(), true, null, null);
Executor executor = server.getExecutorFactory().getExecutor();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cc8a0cb9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
index 4e532bb..c633db8 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
@@ -23,6 +23,7 @@ import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConstants;
import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler;
+import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASLFactory;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.qpid.proton.amqp.Symbol;
@@ -49,19 +50,18 @@ public class AMQPClientConnectionFactory {
this.useCoreSubscriptionNaming = false;
}
- public ActiveMQProtonRemotingConnection createConnection(ProtonProtocolManager protocolManager, Connection connection, Optional<EventHandler> eventHandler) {
+ public ActiveMQProtonRemotingConnection createConnection(ProtonProtocolManager protocolManager, Connection connection, Optional<EventHandler> eventHandler, ClientSASLFactory clientSASLFactory) {
AMQPConnectionCallback connectionCallback = new AMQPConnectionCallback(protocolManager, connection, server.getExecutorFactory().getExecutor(), server);
Executor executor = server.getExecutorFactory().getExecutor();
- AMQPConnectionContext amqpConnection = new AMQPConnectionContext(protocolManager, connectionCallback, containerId, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, useCoreSubscriptionNaming, server.getScheduledPool());
+ AMQPConnectionContext amqpConnection = new AMQPConnectionContext(protocolManager, connectionCallback, containerId, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, useCoreSubscriptionNaming, server.getScheduledPool(), false, clientSASLFactory, connectionProperties);
eventHandler.ifPresent(amqpConnection::addEventHandler);
ActiveMQProtonRemotingConnection delegate = new ActiveMQProtonRemotingConnection(protocolManager, amqpConnection, connection, executor);
connectionCallback.setProtonConnectionDelegate(delegate);
- amqpConnection.open(connectionProperties);
return delegate;
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cc8a0cb9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientConnectionManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientConnectionManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientConnectionManager.java
index ec9136f..df0de77 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientConnectionManager.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientConnectionManager.java
@@ -22,6 +22,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler;
+import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASLFactory;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.BaseConnectionLifeCycleListener;
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
@@ -40,16 +41,19 @@ public class ProtonClientConnectionManager implements BaseConnectionLifeCycleLis
private static final Logger log = Logger.getLogger(ProtonClientConnectionManager.class);
private final AMQPClientConnectionFactory connectionFactory;
private final Optional<EventHandler> eventHandler;
+ private final ClientSASLFactory clientSASLFactory;
- public ProtonClientConnectionManager(AMQPClientConnectionFactory connectionFactory, Optional<EventHandler> eventHandler) {
+ public ProtonClientConnectionManager(AMQPClientConnectionFactory connectionFactory, Optional<EventHandler> eventHandler, ClientSASLFactory clientSASLFactory) {
this.connectionFactory = connectionFactory;
this.eventHandler = eventHandler;
+ this.clientSASLFactory = clientSASLFactory;
}
@Override
public void connectionCreated(ActiveMQComponent component, Connection connection, ProtonProtocolManager protocolManager) {
- ActiveMQProtonRemotingConnection amqpConnection = connectionFactory.createConnection(protocolManager, connection, eventHandler);
+ ActiveMQProtonRemotingConnection amqpConnection = connectionFactory.createConnection(protocolManager, connection, eventHandler, clientSASLFactory);
connectionMap.put(connection.getID(), amqpConnection);
+ amqpConnection.open();
log.info("Connection " + amqpConnection.getRemoteAddress() + " created");
}
@@ -60,6 +64,8 @@ public class ProtonClientConnectionManager implements BaseConnectionLifeCycleLis
if (connection != null) {
log.info("Connection " + connection.getRemoteAddress() + " destroyed");
connection.disconnect(false);
+ } else {
+ log.error("Connection with id " + connectionID + " not found in connectionDestroyed");
}
}
@@ -69,6 +75,8 @@ public class ProtonClientConnectionManager implements BaseConnectionLifeCycleLis
if (connection != null) {
log.info("Connection " + connection.getRemoteAddress() + " exception: " + me.getMessage());
connection.fail(me);
+ } else {
+ log.error("Connection with id " + connectionID + " not found in connectionException");
}
}
@@ -78,6 +86,8 @@ public class ProtonClientConnectionManager implements BaseConnectionLifeCycleLis
if (connection != null) {
log.info("Connection " + connection.getRemoteAddress() + " ready");
connection.getTransportConnection().fireReady(true);
+ } else {
+ log.error("Connection with id " + connectionID + " not found in connectionReadyForWrites()!");
}
}
@@ -92,6 +102,8 @@ public class ProtonClientConnectionManager implements BaseConnectionLifeCycleLis
RemotingConnection connection = connectionMap.get(connectionID);
if (connection != null) {
connection.bufferReceived(connectionID, buffer);
+ } else {
+ log.error("Connection with id " + connectionID + " not found in bufferReceived()!");
}
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cc8a0cb9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
index 680111a..5d376f1 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
@@ -35,6 +35,7 @@ import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler;
import org.apache.activemq.artemis.protocol.amqp.proton.handler.ExtCapability;
import org.apache.activemq.artemis.protocol.amqp.proton.handler.ProtonHandler;
import org.apache.activemq.artemis.protocol.amqp.sasl.AnonymousServerSASL;
+import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASLFactory;
import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.ByteUtil;
@@ -68,6 +69,8 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
protected AMQPConnectionCallback connectionCallback;
private final String containerId;
+ private final boolean isIncomingConnection;
+ private final ClientSASLFactory saslClientFactory;
private final Map<Symbol, Object> connectionProperties = new HashMap<>();
private final ScheduledExecutorService scheduledPool;
@@ -84,19 +87,28 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
int maxFrameSize,
int channelMax,
boolean useCoreSubscriptionNaming,
- ScheduledExecutorService scheduledPool) {
+ ScheduledExecutorService scheduledPool,
+ boolean isIncomingConnection,
+ ClientSASLFactory saslClientFactory,
+ Map<Symbol, Object> connectionProperties) {
this.protocolManager = protocolManager;
this.connectionCallback = connectionSP;
this.useCoreSubscriptionNaming = useCoreSubscriptionNaming;
this.containerId = (containerId != null) ? containerId : UUID.randomUUID().toString();
+ this.isIncomingConnection = isIncomingConnection;
+ this.saslClientFactory = saslClientFactory;
- connectionProperties.put(AmqpSupport.PRODUCT, "apache-activemq-artemis");
- connectionProperties.put(AmqpSupport.VERSION, VersionLoader.getVersion().getFullVersion());
+ this.connectionProperties.put(AmqpSupport.PRODUCT, "apache-activemq-artemis");
+ this.connectionProperties.put(AmqpSupport.VERSION, VersionLoader.getVersion().getFullVersion());
+
+ if (connectionProperties != null) {
+ this.connectionProperties.putAll(connectionProperties);
+ }
this.scheduledPool = scheduledPool;
connectionCallback.setConnection(this);
- this.handler = new ProtonHandler(protocolManager.getServer().getExecutorFactory().getExecutor());
+ this.handler = new ProtonHandler(protocolManager.getServer().getExecutorFactory().getExecutor(), isIncomingConnection);
handler.addEventHandler(this);
Transport transport = handler.getTransport();
transport.setEmitFlowEventOnSend(false);
@@ -106,6 +118,17 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
transport.setChannelMax(channelMax);
transport.setInitialRemoteMaxFrameSize(protocolManager.getInitialRemoteMaxFrameSize());
transport.setMaxFrameSize(maxFrameSize);
+ if (!isIncomingConnection && saslClientFactory != null) {
+ handler.createClientSASL();
+ }
+ }
+
+ public boolean isIncomingConnection() {
+ return isIncomingConnection;
+ }
+
+ public ClientSASLFactory getSaslClientFactory() {
+ return saslClientFactory;
}
protected AMQPSessionContext newSessionExtension(Session realSession) throws ActiveMQAMQPException {
@@ -232,7 +255,7 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
return ExtCapability.getCapabilities();
}
- public void open(Map<Symbol, Object> connectionProperties) {
+ public void open() {
handler.open(containerId, connectionProperties);
}
@@ -271,56 +294,6 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
}
@Override
- public void onInit(Connection connection) throws Exception {
-
- }
-
- @Override
- public void onLocalOpen(Connection connection) throws Exception {
-
- }
-
- @Override
- public void onLocalClose(Connection connection) throws Exception {
-
- }
-
- @Override
- public void onFinal(Connection connection) throws Exception {
-
- }
-
- @Override
- public void onInit(Session session) throws Exception {
-
- }
-
- @Override
- public void onFinal(Session session) throws Exception {
-
- }
-
- @Override
- public void onInit(Link link) throws Exception {
-
- }
-
- @Override
- public void onLocalOpen(Link link) throws Exception {
-
- }
-
- @Override
- public void onLocalClose(Link link) throws Exception {
-
- }
-
- @Override
- public void onFinal(Link link) throws Exception {
-
- }
-
- @Override
public void onAuthInit(ProtonHandler handler, Connection connection, boolean sasl) {
if (sasl) {
// configured mech in decreasing order of preference
@@ -344,6 +317,25 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
}
@Override
+ public void onSaslMechanismsOffered(final ProtonHandler handler, final String[] mechanisms) {
+ if (saslClientFactory != null) {
+ handler.setClientMechanism(saslClientFactory.chooseMechanism(mechanisms));
+ }
+ }
+
+ @Override
+ public void onAuthFailed(final ProtonHandler protonHandler, final Connection connection) {
+ connectionCallback.close();
+ handler.close(null);
+ }
+
+ @Override
+ public void onAuthSuccess(final ProtonHandler protonHandler, final Connection connection) {
+ connection.open();
+ flush();
+ }
+
+ @Override
public void onTransport(Transport transport) {
handler.flushBytes();
}
@@ -438,10 +430,6 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
}
@Override
- public void onLocalClose(Session session) throws Exception {
- }
-
- @Override
public void onRemoteClose(Session session) throws Exception {
lock();
try {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cc8a0cb9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java
index 8b99284..34fba0c 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java
@@ -29,58 +29,66 @@ import org.apache.qpid.proton.engine.Transport;
*/
public interface EventHandler {
- void onAuthInit(ProtonHandler handler, Connection connection, boolean sasl);
+ default void onAuthInit(ProtonHandler handler, Connection connection, boolean sasl) { }
- void onSaslRemoteMechanismChosen(ProtonHandler handler, String mech);
+ default void onSaslRemoteMechanismChosen(ProtonHandler handler, String mech) { }
- void onInit(Connection connection) throws Exception;
+ default void onAuthFailed(ProtonHandler protonHandler, Connection connection) { }
- void onLocalOpen(Connection connection) throws Exception;
+ default void onAuthSuccess(ProtonHandler protonHandler, Connection connection) { }
- void onRemoteOpen(Connection connection) throws Exception;
+ default void onSaslMechanismsOffered(ProtonHandler handler, String[] mechanisms) { }
- void onLocalClose(Connection connection) throws Exception;
+ default void onInit(Connection connection) throws Exception { }
- void onRemoteClose(Connection connection) throws Exception;
+ default void onLocalOpen(Connection connection) throws Exception { }
- void onFinal(Connection connection) throws Exception;
+ default void onRemoteOpen(Connection connection) throws Exception { }
- void onInit(Session session) throws Exception;
+ default void onLocalClose(Connection connection) throws Exception { }
- void onLocalOpen(Session session) throws Exception;
+ default void onRemoteClose(Connection connection) throws Exception { }
- void onRemoteOpen(Session session) throws Exception;
+ default void onFinal(Connection connection) throws Exception { }
- void onLocalClose(Session session) throws Exception;
+ default void onInit(Session session) throws Exception { }
- void onRemoteClose(Session session) throws Exception;
+ default void onLocalOpen(Session session) throws Exception { }
- void onFinal(Session session) throws Exception;
+ default void onRemoteOpen(Session session) throws Exception { }
- void onInit(Link link) throws Exception;
+ default void onLocalClose(Session session) throws Exception { }
- void onLocalOpen(Link link) throws Exception;
+ default void onRemoteClose(Session session) throws Exception { }
- void onRemoteOpen(Link link) throws Exception;
+ default void onFinal(Session session) throws Exception { }
- void onLocalClose(Link link) throws Exception;
+ default void onInit(Link link) throws Exception { }
- void onRemoteClose(Link link) throws Exception;
+ default void onLocalOpen(Link link) throws Exception { }
- void onFlow(Link link) throws Exception;
+ default void onRemoteOpen(Link link) throws Exception { }
- void onFinal(Link link) throws Exception;
+ default void onLocalClose(Link link) throws Exception { }
- void onRemoteDetach(Link link) throws Exception;
+ default void onRemoteClose(Link link) throws Exception { }
- void onLocalDetach(Link link) throws Exception;
+ default void onFlow(Link link) throws Exception { }
- void onDelivery(Delivery delivery) throws Exception;
+ default void onFinal(Link link) throws Exception { }
- void onTransport(Transport transport) throws Exception;
+ default void onRemoteDetach(Link link) throws Exception { }
- void pushBytes(ByteBuf bytes);
+ default void onLocalDetach(Link link) throws Exception { }
- boolean flowControl(ReadyListener readyListener);
+ default void onDelivery(Delivery delivery) throws Exception { }
+
+ default void onTransport(Transport transport) throws Exception { }
+
+ default void pushBytes(ByteBuf bytes) { }
+
+ default boolean flowControl(ReadyListener readyListener) {
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cc8a0cb9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
index 918b383..54201ea 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
@@ -16,8 +16,10 @@
*/
package org.apache.activemq.artemis.protocol.amqp.proton.handler;
+import javax.security.auth.Subject;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
@@ -25,6 +27,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonInitializable;
+import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASL;
import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
import org.apache.activemq.artemis.protocol.amqp.sasl.ServerSASL;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
@@ -60,14 +63,17 @@ public class ProtonHandler extends ProtonInitializable {
private List<EventHandler> handlers = new ArrayList<>();
- private Sasl serverSasl;
+ private Sasl sasl;
private ServerSASL chosenMechanism;
+ private ClientSASL clientSASLMechanism;
private final ReentrantLock lock = new ReentrantLock();
private final long creationTime;
+ private final boolean isServer;
+
private SASLResult saslResult;
protected volatile boolean dataReceived;
@@ -80,12 +86,13 @@ public class ProtonHandler extends ProtonInitializable {
boolean inDispatch = false;
- public ProtonHandler(Executor flushExecutor) {
+ public ProtonHandler(Executor flushExecutor, boolean isServer) {
this.flushExecutor = flushExecutor;
this.readyListener = () -> flushExecutor.execute(() -> {
flush();
});
this.creationTime = System.currentTimeMillis();
+ this.isServer = isServer;
transport.bind(connection);
connection.collect(collector);
}
@@ -157,9 +164,9 @@ public class ProtonHandler extends ProtonInitializable {
}
public void createServerSASL(String[] mechanisms) {
- this.serverSasl = transport.sasl();
- this.serverSasl.server();
- serverSasl.setMechanisms(mechanisms);
+ this.sasl = transport.sasl();
+ this.sasl.server();
+ sasl.setMechanisms(mechanisms);
}
public void flushBytes() {
@@ -210,7 +217,11 @@ public class ProtonHandler extends ProtonInitializable {
try {
byte auth = buffer.getByte(4);
if (auth == SASL || auth == BARE) {
- dispatchAuth(auth == SASL);
+ if (isServer) {
+ dispatchAuth(auth == SASL);
+ } else if (auth == BARE && clientSASLMechanism == null) {
+ dispatchAuthSuccess();
+ }
/*
* there is a chance that if SASL Handshake has been carried out that the capacity may change.
* */
@@ -260,7 +271,7 @@ public class ProtonHandler extends ProtonInitializable {
lock.lock();
try {
transport.process();
- checkServerSASL();
+ checkSASL();
} finally {
lock.unlock();
}
@@ -282,52 +293,131 @@ public class ProtonHandler extends ProtonInitializable {
flush();
}
- protected void checkServerSASL() {
- if (serverSasl != null && serverSasl.getRemoteMechanisms().length > 0) {
+ protected void checkSASL() {
+ if (isServer) {
+ if (sasl != null && sasl.getRemoteMechanisms().length > 0) {
- if (chosenMechanism == null) {
- if (log.isTraceEnabled()) {
- log.trace("SASL chosenMechanism: " + serverSasl.getRemoteMechanisms()[0]);
+ if (chosenMechanism == null) {
+ if (log.isTraceEnabled()) {
+ log.trace("SASL chosenMechanism: " + sasl.getRemoteMechanisms()[0]);
+ }
+ dispatchRemoteMechanismChosen(sasl.getRemoteMechanisms()[0]);
}
- dispatchRemoteMechanismChosen(serverSasl.getRemoteMechanisms()[0]);
- }
- if (chosenMechanism != null) {
+ if (chosenMechanism != null) {
- byte[] dataSASL = new byte[serverSasl.pending()];
- serverSasl.recv(dataSASL, 0, dataSASL.length);
+ byte[] dataSASL = new byte[sasl.pending()];
+ sasl.recv(dataSASL, 0, dataSASL.length);
- if (log.isTraceEnabled()) {
- log.trace("Working on sasl::" + (dataSASL != null && dataSASL.length > 0 ? ByteUtil.bytesToHex(dataSASL, 2) : "Anonymous"));
- }
+ if (log.isTraceEnabled()) {
+ log.trace("Working on sasl::" + (dataSASL != null && dataSASL.length > 0 ? ByteUtil.bytesToHex(dataSASL, 2) : "Anonymous"));
+ }
- byte[] response = chosenMechanism.processSASL(dataSASL);
- if (response != null) {
- serverSasl.send(response, 0, response.length);
- }
- saslResult = chosenMechanism.result();
+ byte[] response = chosenMechanism.processSASL(dataSASL);
+ if (response != null) {
+ sasl.send(response, 0, response.length);
+ }
+ saslResult = chosenMechanism.result();
- if (saslResult != null) {
- if (saslResult.isSuccess()) {
- saslComplete(Sasl.SaslOutcome.PN_SASL_OK);
- } else {
- saslComplete(Sasl.SaslOutcome.PN_SASL_AUTH);
+ if (saslResult != null) {
+ if (saslResult.isSuccess()) {
+ saslComplete(Sasl.SaslOutcome.PN_SASL_OK);
+ } else {
+ saslComplete(Sasl.SaslOutcome.PN_SASL_AUTH);
+ }
}
+ } else {
+ // no auth available, system error
+ saslComplete(Sasl.SaslOutcome.PN_SASL_SYS);
+ }
+ }
+ } else {
+ if (sasl != null) {
+ switch (sasl.getState()) {
+ case PN_SASL_IDLE:
+ if (sasl.getRemoteMechanisms().length != 0) {
+ dispatchMechanismsOffered(sasl.getRemoteMechanisms());
+
+ if (clientSASLMechanism == null) {
+ log.infof("Outbound connection failed - unknown mechanism, offered mechanisms: %s",
+ Arrays.asList(sasl.getRemoteMechanisms()));
+ sasl = null;
+ dispatchAuthFailed();
+ } else {
+ sasl.setMechanisms(clientSASLMechanism.getName());
+ byte[] initialResponse = clientSASLMechanism.getInitialResponse();
+ if (initialResponse != null) {
+ sasl.send(initialResponse, 0, initialResponse.length);
+ }
+ }
+ }
+ break;
+ case PN_SASL_STEP:
+ int challengeSize = sasl.pending();
+ byte[] challenge = new byte[challengeSize];
+ sasl.recv(challenge, 0, challengeSize);
+ byte[] response = clientSASLMechanism.getResponse(challenge);
+ sasl.send(response, 0, response.length);
+ break;
+ case PN_SASL_FAIL:
+ log.info("Outbound connection failed, authentication failure");
+ sasl = null;
+ dispatchAuthFailed();
+ break;
+ case PN_SASL_PASS:
+ log.debug("Outbound connection succeeded");
+ saslResult = new SASLResult() {
+ @Override
+ public String getUser() {
+ return null;
+ }
+
+ @Override
+ public Subject getSubject() {
+ return null;
+ }
+
+ @Override
+ public boolean isSuccess() {
+ return true;
+ }
+ };
+ sasl = null;
+
+ dispatchAuthSuccess();
+ break;
+ case PN_SASL_CONF:
+ // do nothing
+ break;
}
- } else {
- // no auth available, system error
- saslComplete(Sasl.SaslOutcome.PN_SASL_SYS);
}
}
}
private void saslComplete(Sasl.SaslOutcome saslOutcome) {
- serverSasl.done(saslOutcome);
- serverSasl = null;
+ sasl.done(saslOutcome);
+ sasl = null;
if (chosenMechanism != null) {
chosenMechanism.done();
}
}
+ private void dispatchAuthFailed() {
+ for (EventHandler h : handlers) {
+ h.onAuthFailed(this, getConnection());
+ }
+ }
+
+ private void dispatchAuthSuccess() {
+ for (EventHandler h : handlers) {
+ h.onAuthSuccess(this, getConnection());
+ }
+ }
+
+ private void dispatchMechanismsOffered(final String[] mechs) {
+ for (EventHandler h : handlers) {
+ h.onSaslMechanismsOffered(this, mechs);
+ }
+ }
private void dispatchAuth(boolean sasl) {
for (EventHandler h : handlers) {
h.onAuthInit(this, getConnection(), sasl);
@@ -393,4 +483,13 @@ public class ProtonHandler extends ProtonInitializable {
public void setChosenMechanism(ServerSASL chosenMechanism) {
this.chosenMechanism = chosenMechanism;
}
+
+ public void setClientMechanism(final ClientSASL saslClientMech) {
+ this.clientSASLMechanism = saslClientMech;
+ }
+
+ public void createClientSASL() {
+ this.sasl = transport.sasl();
+ this.sasl.client();
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cc8a0cb9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/ClientSASL.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/ClientSASL.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/ClientSASL.java
new file mode 100644
index 0000000..cd132a9
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/ClientSASL.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.sasl;
+
+public interface ClientSASL {
+ String getName();
+ byte[] getInitialResponse();
+ byte[] getResponse(byte[] challenge);
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cc8a0cb9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/ClientSASLFactory.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/ClientSASLFactory.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/ClientSASLFactory.java
new file mode 100644
index 0000000..f6e9e25
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/ClientSASLFactory.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.sasl;
+
+public interface ClientSASLFactory {
+ ClientSASL chooseMechanism(String[] availableMechanims);
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cc8a0cb9/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java
index 3d8be49..16589fc 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java
@@ -16,10 +16,13 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
@@ -28,16 +31,37 @@ import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFac
import org.apache.activemq.artemis.protocol.amqp.client.AMQPClientConnectionFactory;
import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientConnectionManager;
import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientProtocolManager;
+import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler;
+import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASL;
+import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASLFactory;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.engine.Connection;
import org.junit.Test;
public class AmqpOutboundConnectionTest extends AmqpClientTestSupport {
+ private boolean securityEnabled;
+
@Test(timeout = 60000)
public void testOutboundConnection() throws Throwable {
- final ActiveMQServer remote = createServer(AMQP_PORT + 1);
- remote.start();
+ runOutboundConnectionTest(false);
+ }
+
+ @Test(timeout = 60000)
+ public void testOutboundConnectionWithSecurity() throws Throwable {
+ runOutboundConnectionTest(true);
+ }
+
+
+ private void runOutboundConnectionTest(boolean withSecurity) throws Exception {
+ final ActiveMQServer remote;
+ try {
+ securityEnabled = withSecurity;
+ remote = createServer(AMQP_PORT + 1);
+ } finally {
+ securityEnabled = false;
+ }
try {
Wait.waitFor(remote::isActive);
} catch (Exception e) {
@@ -45,10 +69,30 @@ public class AmqpOutboundConnectionTest extends AmqpClientTestSupport {
throw e;
}
- final Map<String, Object> config = new LinkedHashMap<>();
- config.put(TransportConstants.HOST_PROP_NAME, "localhost");
+ final Map<String, Object> config = new LinkedHashMap<>(); config.put(TransportConstants.HOST_PROP_NAME, "localhost");
config.put(TransportConstants.PORT_PROP_NAME, String.valueOf(AMQP_PORT + 1));
- ProtonClientConnectionManager lifeCycleListener = new ProtonClientConnectionManager(new AMQPClientConnectionFactory(server, "myid", Collections.singletonMap(Symbol.getSymbol("myprop"), "propvalue"), 5000), Optional.empty());
+ final ClientSASLFactory clientSASLFactory;
+ if (withSecurity) {
+ clientSASLFactory = availableMechanims -> {
+ if (availableMechanims != null && Arrays.asList(availableMechanims).contains("PLAIN")) {
+ return new PlainSASLMechanism(fullUser, fullPass);
+ } else {
+ return null;
+ }
+ };
+ } else {
+ clientSASLFactory = null;
+ }
+ final AtomicBoolean connectionOpened = new AtomicBoolean();
+
+ EventHandler eventHandler = new EventHandler() {
+ @Override
+ public void onRemoteOpen(Connection connection) throws Exception {
+ connectionOpened.set(true);
+ }
+ };
+
+ ProtonClientConnectionManager lifeCycleListener = new ProtonClientConnectionManager(new AMQPClientConnectionFactory(server, "myid", Collections.singletonMap(Symbol.getSymbol("myprop"), "propvalue"), 5000), Optional.of(eventHandler), clientSASLFactory);
ProtonClientProtocolManager protocolManager = new ProtonClientProtocolManager(new ProtonProtocolManagerFactory(), server);
NettyConnector connector = new NettyConnector(config, lifeCycleListener, lifeCycleListener, server.getExecutorFactory().getExecutor(), server.getExecutorFactory().getExecutor(), server.getScheduledPool(), protocolManager);
connector.start();
@@ -57,7 +101,8 @@ public class AmqpOutboundConnectionTest extends AmqpClientTestSupport {
try {
Wait.waitFor(() -> remote.getConnectionCount() > 0);
assertEquals(1, remote.getConnectionCount());
-
+ Wait.waitFor(connectionOpened::get);
+ assertTrue("Remote connection was not opened - authentication error?", connectionOpened.get());
lifeCycleListener.stop();
Wait.waitFor(() -> remote.getConnectionCount() == 0);
@@ -67,4 +112,38 @@ public class AmqpOutboundConnectionTest extends AmqpClientTestSupport {
remote.stop();
}
}
+
+ @Override
+ protected boolean isSecurityEnabled() {
+ return securityEnabled;
+ }
+
+ private static class PlainSASLMechanism implements ClientSASL {
+
+ private final byte[] initialResponse;
+
+ PlainSASLMechanism(String username, String password) {
+ byte[] usernameBytes = username.getBytes(StandardCharsets.UTF_8);
+ byte[] passwordBytes = password.getBytes(StandardCharsets.UTF_8);
+ byte[] encoded = new byte[usernameBytes.length + passwordBytes.length + 2];
+ System.arraycopy(usernameBytes, 0, encoded, 1, usernameBytes.length);
+ System.arraycopy(passwordBytes, 0, encoded, usernameBytes.length + 2, passwordBytes.length);
+ initialResponse = encoded;
+ }
+
+ @Override
+ public String getName() {
+ return "PLAIN";
+ }
+
+ @Override
+ public byte[] getInitialResponse() {
+ return initialResponse;
+ }
+
+ @Override
+ public byte[] getResponse(byte[] challenge) {
+ return new byte[0];
+ }
+ }
}
[2/2] activemq-artemis git commit: This closes #1579
Posted by ma...@apache.org.
This closes #1579
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/88e1fdc7
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/88e1fdc7
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/88e1fdc7
Branch: refs/heads/master
Commit: 88e1fdc789bc91af1abf09dcc8e67f5c1e68f8b8
Parents: 30ba65a cc8a0cb
Author: Martyn Taylor <mt...@redhat.com>
Authored: Mon Oct 9 13:29:49 2017 +0100
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Mon Oct 9 13:29:49 2017 +0100
----------------------------------------------------------------------
.../ActiveMQProtonRemotingConnection.java | 4 +
.../amqp/broker/ProtonProtocolManager.java | 2 +-
.../client/AMQPClientConnectionFactory.java | 6 +-
.../client/ProtonClientConnectionManager.java | 16 +-
.../amqp/proton/AMQPConnectionContext.java | 106 ++++++------
.../amqp/proton/handler/EventHandler.java | 62 ++++---
.../amqp/proton/handler/ProtonHandler.java | 169 +++++++++++++++----
.../artemis/protocol/amqp/sasl/ClientSASL.java | 23 +++
.../protocol/amqp/sasl/ClientSASLFactory.java | 21 +++
.../amqp/AmqpOutboundConnectionTest.java | 91 +++++++++-
10 files changed, 367 insertions(+), 133 deletions(-)
----------------------------------------------------------------------