You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/09/15 15:17:10 UTC
[1/2] activemq-artemis git commit: ARTEMIS-728 Broker doesn't support
unique jms client-id (qpid-jms client)
Repository: activemq-artemis
Updated Branches:
refs/heads/master 1f392da88 -> 646a89198
ARTEMIS-728 Broker doesn't support unique jms client-id (qpid-jms client)
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/406d09d9
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/406d09d9
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/406d09d9
Branch: refs/heads/master
Commit: 406d09d986f19fd8f5f0aa2fb0973fa425106dcc
Parents: 1f392da
Author: Howard Gao <ho...@gmail.com>
Authored: Mon Sep 12 16:54:06 2016 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Sep 15 11:17:02 2016 -0400
----------------------------------------------------------------------
.../plug/ActiveMQProtonConnectionCallback.java | 79 ++++++++++++++++++++
.../org/proton/plug/AMQPConnectionCallback.java | 5 ++
.../main/java/org/proton/plug/AmqpSupport.java | 1 +
.../plug/context/AbstractConnectionContext.java | 29 +++++--
.../plug/context/ProtonInitializable.java | 2 +-
.../server/ProtonServerConnectionContext.java | 16 +++-
.../org/proton/plug/handler/ExtCapability.java | 46 ++++++++++++
.../plug/handler/impl/ProtonHandlerImpl.java | 1 -
.../context/AbstractConnectionContextTest.java | 10 +++
.../proton/plug/test/invm/ProtonINVMSPI.java | 20 +++++
.../plug/test/minimalclient/AMQPClientSPI.java | 11 +++
.../minimalserver/MinimalConnectionSPI.java | 11 +++
.../tests/integration/proton/ProtonTest.java | 52 +++++++++----
13 files changed, 258 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/406d09d9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java
index 6e6a405..707b312 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java
@@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.core.protocol.proton.plug;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
@@ -23,19 +25,35 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.protocol.proton.ActiveMQProtonRemotingConnection;
import org.apache.activemq.artemis.core.protocol.proton.ProtonProtocolManager;
import org.apache.activemq.artemis.core.protocol.proton.sasl.ActiveMQPlainSASL;
+import org.apache.activemq.artemis.core.server.ServerSession;
+import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.apache.activemq.artemis.utils.UUIDGenerator;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.transport.AmqpError;
+import org.jboss.logging.Logger;
import org.proton.plug.AMQPConnectionCallback;
import org.proton.plug.AMQPConnectionContext;
import org.proton.plug.AMQPSessionCallback;
+import org.proton.plug.SASLResult;
import org.proton.plug.ServerSASL;
+import org.proton.plug.handler.ExtCapability;
import org.proton.plug.sasl.AnonymousServerSASL;
+import org.proton.plug.sasl.PlainSASLResult;
+
+import static org.proton.plug.AmqpSupport.CONTAINER_ID;
+import static org.proton.plug.AmqpSupport.INVALID_FIELD;
+import static org.proton.plug.context.AbstractConnectionContext.CONNECTION_OPEN_FAILED;
public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback {
+ private static final Logger log = Logger.getLogger(ActiveMQProtonConnectionCallback.class);
private final ProtonProtocolManager manager;
@@ -49,6 +67,8 @@ public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback
private final Executor closeExecutor;
+ private ServerSession internalSession;
+
public ActiveMQProtonConnectionCallback(ProtonProtocolManager manager,
Connection connection,
Executor closeExecutor) {
@@ -86,7 +106,42 @@ public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback
}
@Override
+ public void init() throws Exception {
+ //This internal core session is used to represent the connection
+ //in core server. It is used to identify unique clientIDs.
+ //Note the Qpid-JMS client does create a initial session
+ //for each connection. However is comes in as a Begin
+ //After Open. This makes it unusable for this purpose
+ //as we need to decide the uniqueness in response to
+ //Open, and the checking Uniqueness and adding the unique
+ //client-id to server need to be atomic.
+ if (internalSession == null) {
+ SASLResult saslResult = amqpConnection.getSASLResult();
+ String user = null;
+ String passcode = null;
+ if (saslResult != null) {
+ user = saslResult.getUser();
+ if (saslResult instanceof PlainSASLResult) {
+ passcode = ((PlainSASLResult) saslResult).getPassword();
+ }
+ }
+ internalSession = manager.getServer().createSession(createInternalSessionName(), user, passcode, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, protonConnectionDelegate, // RemotingConnection remotingConnection,
+ false,
+ false,
+ false,
+ false,
+ null, (SessionCallback) createSessionCallback(this.amqpConnection), true);
+ }
+ }
+
+ @Override
public void close() {
+ try {
+ internalSession.close(false);
+ }
+ catch (Exception e) {
+ log.error("error closing internal session", e);
+ }
connection.close();
amqpConnection.close();
}
@@ -151,4 +206,28 @@ public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback
public void sendSASLSupported() {
connection.write(ActiveMQBuffers.wrappedBuffer(new byte[]{'A', 'M', 'Q', 'P', 3, 1, 0, 0}));
}
+
+ @Override
+ public boolean validateConnection(org.apache.qpid.proton.engine.Connection connection, SASLResult saslResult) {
+ String remote = connection.getRemoteContainer();
+
+ if (ExtCapability.needUniqueConnection(connection)) {
+ if (!internalSession.addUniqueMetaData(ClientSession.JMS_SESSION_CLIENT_ID_PROPERTY, remote)) {
+ //https://issues.apache.org/jira/browse/ARTEMIS-728
+ Map<Symbol, Object> connProp = new HashMap<>();
+ connProp.put(CONNECTION_OPEN_FAILED, "true");
+ connection.setProperties(connProp);
+ connection.getCondition().setCondition(AmqpError.INVALID_FIELD);
+ Map<Symbol, Symbol> info = new HashMap<>();
+ info.put(INVALID_FIELD, CONTAINER_ID);
+ connection.getCondition().setInfo(info);
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private String createInternalSessionName() {
+ return "amqp:" + UUIDGenerator.getInstance().generateStringUUID();
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/406d09d9/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java
index 199d68d..df14b0f 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java
@@ -17,9 +17,12 @@
package org.proton.plug;
import io.netty.buffer.ByteBuf;
+import org.apache.qpid.proton.engine.Connection;
public interface AMQPConnectionCallback {
+ void init() throws Exception;
+
void close();
/**
@@ -41,4 +44,6 @@ public interface AMQPConnectionCallback {
boolean isSupportsAnonymous();
void sendSASLSupported();
+
+ boolean validateConnection(Connection connection, SASLResult saslResult);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/406d09d9/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AmqpSupport.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AmqpSupport.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AmqpSupport.java
index 1580855..4ddbbcc 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AmqpSupport.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AmqpSupport.java
@@ -60,6 +60,7 @@ public class AmqpSupport {
// Lifetime policy symbols
public static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
+ public static final Symbol SOLE_CONNECTION_CAPABILITY = Symbol.valueOf("sole-connection-for-container");
/**
* Search for a given Symbol in a given array of Symbol object.
*
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/406d09d9/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java
index 9ece790..120a37b 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java
@@ -188,6 +188,13 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl
return null;
}
+ protected boolean validateConnection(Connection connection) {
+ return true;
+ }
+
+ protected void initInternal() throws Exception {
+ }
+
// This listener will perform a bunch of things here
class LocalListener extends DefaultEventHandler {
@@ -213,13 +220,25 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl
@Override
public void onRemoteOpen(Connection connection) throws Exception {
synchronized (getLock()) {
- connection.setContext(AbstractConnectionContext.this);
- connection.setContainer(containerId);
- connection.setProperties(connectionProperties);
- connection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
- connection.open();
+ try {
+ initInternal();
+ }
+ catch (Exception e) {
+ log.error("Error init connection", e);
+ }
+ if (!validateConnection(connection)) {
+ connection.close();
+ }
+ else {
+ connection.setContext(AbstractConnectionContext.this);
+ connection.setContainer(containerId);
+ connection.setProperties(connectionProperties);
+ connection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
+ connection.open();
+ }
}
initialise();
+
/*
* This can be null which is in effect an empty map, also we really dont need to check this for in bound connections
* but its here in case we add support for outbound connections.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/406d09d9/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonInitializable.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonInitializable.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonInitializable.java
index c065527..266e8b2 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonInitializable.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonInitializable.java
@@ -39,7 +39,7 @@ public class ProtonInitializable {
public void initialise() throws Exception {
if (!initialized) {
- initialized = false;
+ initialized = true;
try {
if (afterInit != null) {
afterInit.run();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/406d09d9/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java
index 83048d1..efaaed4 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java
@@ -16,10 +16,9 @@
*/
package org.proton.plug.context.server;
-import static org.proton.plug.AmqpSupport.DELAYED_DELIVERY;
-
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transaction.Coordinator;
+import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sender;
@@ -30,6 +29,7 @@ import org.proton.plug.AMQPSessionCallback;
import org.proton.plug.context.AbstractConnectionContext;
import org.proton.plug.context.AbstractProtonSessionContext;
import org.proton.plug.exceptions.ActiveMQAMQPException;
+import org.proton.plug.handler.ExtCapability;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
@@ -59,6 +59,16 @@ public class ProtonServerConnectionContext extends AbstractConnectionContext imp
}
@Override
+ protected boolean validateConnection(Connection connection) {
+ return connectionCallback.validateConnection(connection, handler.getSASLResult());
+ }
+
+ @Override
+ protected void initInternal() throws Exception {
+ connectionCallback.init();
+ }
+
+ @Override
protected void remoteLinkOpened(Link link) throws Exception {
ProtonServerSessionContext protonSession = (ProtonServerSessionContext) getSessionExtension(link.getSession());
@@ -84,6 +94,6 @@ public class ProtonServerConnectionContext extends AbstractConnectionContext imp
@Override
public Symbol[] getConnectionCapabilitiesOffered() {
- return new Symbol[]{DELAYED_DELIVERY};
+ return ExtCapability.getCapabilities();
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/406d09d9/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/ExtCapability.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/ExtCapability.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/ExtCapability.java
new file mode 100644
index 0000000..cbb96fd
--- /dev/null
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/ExtCapability.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.proton.plug.handler;
+
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.engine.Connection;
+
+import static org.proton.plug.AmqpSupport.DELAYED_DELIVERY;
+import static org.proton.plug.AmqpSupport.SOLE_CONNECTION_CAPABILITY;
+
+public class ExtCapability {
+
+ public static final Symbol[] capabilities = new Symbol[] {
+ SOLE_CONNECTION_CAPABILITY, DELAYED_DELIVERY
+ };
+
+ public static Symbol[] getCapabilities() {
+ return capabilities;
+ }
+
+ public static boolean needUniqueConnection(Connection connection) {
+ Symbol[] extCapabilities = connection.getRemoteDesiredCapabilities();
+ if (extCapabilities != null) {
+ for (Symbol sym : extCapabilities) {
+ if (sym.compareTo(SOLE_CONNECTION_CAPABILITY) == 0) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/406d09d9/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/impl/ProtonHandlerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/impl/ProtonHandlerImpl.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/impl/ProtonHandlerImpl.java
index 7208d16..b2f6406 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/impl/ProtonHandlerImpl.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/impl/ProtonHandlerImpl.java
@@ -386,5 +386,4 @@ public class ProtonHandlerImpl extends ProtonInitializable implements ProtonHand
}
}
-
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/406d09d9/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java
index a9bb152..91af8f5 100644
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java
+++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java
@@ -28,6 +28,7 @@ import org.junit.Test;
import org.proton.plug.AMQPConnectionCallback;
import org.proton.plug.AMQPConnectionContext;
import org.proton.plug.AMQPSessionCallback;
+import org.proton.plug.SASLResult;
import org.proton.plug.ServerSASL;
import org.proton.plug.exceptions.ActiveMQAMQPException;
import org.proton.plug.handler.EventHandler;
@@ -72,6 +73,10 @@ public class AbstractConnectionContextTest {
private class TestConnectionCallback implements AMQPConnectionCallback {
@Override
+ public void init() throws Exception {
+ }
+
+ @Override
public void close() {
}
@@ -110,5 +115,10 @@ public class AbstractConnectionContextTest {
public void sendSASLSupported() {
}
+
+ @Override
+ public boolean validateConnection(Connection connection, SASLResult saslResult) {
+ return true;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/406d09d9/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java
index f9b2e9a..bf83f8a 100644
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java
+++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java
@@ -21,10 +21,12 @@ import java.util.concurrent.Executors;
import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
+import org.apache.qpid.proton.engine.Connection;
import org.jboss.logging.Logger;
import org.proton.plug.AMQPConnectionContext;
import org.proton.plug.AMQPConnectionCallback;
import org.proton.plug.AMQPSessionCallback;
+import org.proton.plug.SASLResult;
import org.proton.plug.ServerSASL;
import org.proton.plug.context.server.ProtonServerConnectionContext;
import org.proton.plug.sasl.AnonymousServerSASL;
@@ -59,6 +61,10 @@ public class ProtonINVMSPI implements AMQPConnectionCallback {
}
@Override
+ public void init() throws Exception {
+ }
+
+ @Override
public void close() {
mainExecutor.shutdown();
}
@@ -79,6 +85,11 @@ public class ProtonINVMSPI implements AMQPConnectionCallback {
}
@Override
+ public boolean validateConnection(Connection connection, SASLResult saslResult) {
+ return true;
+ }
+
+ @Override
public void onTransport(final ByteBuf bytes, final AMQPConnectionContext connection) {
if (log.isTraceEnabled()) {
ByteUtil.debugFrame(log, "InVM->", bytes);
@@ -126,6 +137,10 @@ public class ProtonINVMSPI implements AMQPConnectionCallback {
class ReturnSPI implements AMQPConnectionCallback {
@Override
+ public void init() throws Exception {
+ }
+
+ @Override
public void close() {
}
@@ -146,6 +161,11 @@ public class ProtonINVMSPI implements AMQPConnectionCallback {
}
@Override
+ public boolean validateConnection(Connection connection, SASLResult saslResult) {
+ return true;
+ }
+
+ @Override
public void onTransport(final ByteBuf bytes, final AMQPConnectionContext connection) {
final int size = bytes.writerIndex();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/406d09d9/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java
index 17e51c7..fbdee59 100644
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java
+++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java
@@ -22,10 +22,12 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
+import org.apache.qpid.proton.engine.Connection;
import org.jboss.logging.Logger;
import org.proton.plug.AMQPConnectionContext;
import org.proton.plug.AMQPConnectionCallback;
import org.proton.plug.AMQPSessionCallback;
+import org.proton.plug.SASLResult;
import org.proton.plug.ServerSASL;
import org.proton.plug.sasl.AnonymousServerSASL;
import org.proton.plug.sasl.ServerSASLPlain;
@@ -53,6 +55,10 @@ public class AMQPClientSPI implements AMQPConnectionCallback {
}
@Override
+ public void init() throws Exception {
+ }
+
+ @Override
public void close() {
}
@@ -72,6 +78,11 @@ public class AMQPClientSPI implements AMQPConnectionCallback {
}
+ @Override
+ public boolean validateConnection(Connection connection, SASLResult saslResult) {
+ return true;
+ }
+
final ReusableLatch latch = new ReusableLatch(0);
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/406d09d9/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java
index 02cb06e..055b29d 100644
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java
+++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java
@@ -25,10 +25,12 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
+import org.apache.qpid.proton.engine.Connection;
import org.jboss.logging.Logger;
import org.proton.plug.AMQPConnectionContext;
import org.proton.plug.AMQPConnectionCallback;
import org.proton.plug.AMQPSessionCallback;
+import org.proton.plug.SASLResult;
import org.proton.plug.ServerSASL;
import org.proton.plug.sasl.AnonymousServerSASL;
import org.proton.plug.sasl.ServerSASLPlain;
@@ -49,6 +51,10 @@ public class MinimalConnectionSPI implements AMQPConnectionCallback {
ExecutorService executorService = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
@Override
+ public void init() throws Exception {
+ }
+
+ @Override
public void close() {
executorService.shutdown();
}
@@ -81,6 +87,11 @@ public class MinimalConnectionSPI implements AMQPConnectionCallback {
}
@Override
+ public boolean validateConnection(Connection connection, SASLResult saslResult) {
+ return true;
+ }
+
+ @Override
public void onTransport(final ByteBuf bytes, final AMQPConnectionContext connection) {
final int bufferSize = bytes.writerIndex();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/406d09d9/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
index b3d9a5f..245c6b9 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
@@ -27,6 +27,7 @@ import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
+import javax.jms.InvalidClientIDException;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
@@ -1533,6 +1534,35 @@ public class ProtonTest extends ProtonTestBase {
connection.close();
}
+ @Test
+ public void testClientID() throws Exception {
+ Connection testConn1 = createConnection(false);
+ Connection testConn2 = createConnection(false);
+ try {
+ testConn1.setClientID("client-id1");
+ try {
+ testConn1.setClientID("client-id2");
+ fail("didn't get expected exception");
+ }
+ catch (javax.jms.IllegalStateException e) {
+ //expected
+ }
+
+ try {
+ testConn2.setClientID("client-id1");
+ fail("didn't get expected exception");
+ }
+ catch (InvalidClientIDException e) {
+ //expected
+ }
+ }
+ finally {
+ testConn1.close();
+ testConn2.close();
+ }
+
+ }
+
private javax.jms.Queue createQueue(String address) throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
try {
@@ -1543,29 +1573,19 @@ public class ProtonTest extends ProtonTestBase {
}
}
- private javax.jms.Connection createConnection() throws JMSException {
+ private Connection createConnection() throws JMSException {
+ return this.createConnection(true);
+ }
+
+ private javax.jms.Connection createConnection(boolean isStart) throws JMSException {
Connection connection;
if (protocol == 3) {
factory = new JmsConnectionFactory(amqpConnectionUri);
connection = factory.createConnection();
- connection.setExceptionListener(new ExceptionListener() {
- @Override
- public void onException(JMSException exception) {
- exception.printStackTrace();
- }
- });
- connection.start();
}
else if (protocol == 0) {
factory = new JmsConnectionFactory(userName, password, amqpConnectionUri);
connection = factory.createConnection();
- connection.setExceptionListener(new ExceptionListener() {
- @Override
- public void onException(JMSException exception) {
- exception.printStackTrace();
- }
- });
- connection.start();
}
else {
TransportConfiguration transport;
@@ -1579,6 +1599,8 @@ public class ProtonTest extends ProtonTestBase {
}
connection = factory.createConnection(userName, password);
+ }
+ if (isStart) {
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
[2/2] activemq-artemis git commit: This closes #769
Posted by cl...@apache.org.
This closes #769
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/646a8919
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/646a8919
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/646a8919
Branch: refs/heads/master
Commit: 646a8919885955f0123d41707e3ccb43ef3d5dbf
Parents: 1f392da 406d09d
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Sep 15 11:17:03 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Sep 15 11:17:03 2016 -0400
----------------------------------------------------------------------
.../plug/ActiveMQProtonConnectionCallback.java | 79 ++++++++++++++++++++
.../org/proton/plug/AMQPConnectionCallback.java | 5 ++
.../main/java/org/proton/plug/AmqpSupport.java | 1 +
.../plug/context/AbstractConnectionContext.java | 29 +++++--
.../plug/context/ProtonInitializable.java | 2 +-
.../server/ProtonServerConnectionContext.java | 16 +++-
.../org/proton/plug/handler/ExtCapability.java | 46 ++++++++++++
.../plug/handler/impl/ProtonHandlerImpl.java | 1 -
.../context/AbstractConnectionContextTest.java | 10 +++
.../proton/plug/test/invm/ProtonINVMSPI.java | 20 +++++
.../plug/test/minimalclient/AMQPClientSPI.java | 11 +++
.../minimalserver/MinimalConnectionSPI.java | 11 +++
.../tests/integration/proton/ProtonTest.java | 52 +++++++++----
13 files changed, 258 insertions(+), 25 deletions(-)
----------------------------------------------------------------------