You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/09/25 15:02:26 UTC
[2/4] activemq-artemis git commit: ARTEMIS-1218 implement MQTT link
stealing
ARTEMIS-1218 implement MQTT link stealing
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/dac62517
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/dac62517
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/dac62517
Branch: refs/heads/master
Commit: dac625179ac95e4953be3cb2ae6e2a8513235b38
Parents: 144dbad
Author: Justin Bertram <jb...@apache.org>
Authored: Fri Sep 15 10:59:57 2017 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Sep 25 11:00:56 2017 -0400
----------------------------------------------------------------------
.../protocol/mqtt/MQTTConnectionManager.java | 19 +++--
.../core/protocol/mqtt/MQTTProtocolManager.java | 24 ++++--
.../integration/mqtt/imported/MQTTTest.java | 77 +++++++++-----------
3 files changed, 66 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dac62517/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
index 79b97a3..7e88028 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
@@ -143,8 +143,12 @@ public class MQTTConnectionManager {
if (session.getSessionState() != null) {
session.getSessionState().setAttached(false);
String clientId = session.getSessionState().getClientId();
- if (clientId != null) {
- session.getProtocolManager().getConnectedClients().remove(clientId);
+ /**
+ * ensure that the connection for the client ID matches *this* connection otherwise we could remove the
+ * entry for the client who "stole" this client ID via [MQTT-3.1.4-2]
+ */
+ if (clientId != null && session.getProtocolManager().isClientConnected(clientId, session.getConnection())) {
+ session.getProtocolManager().removeConnectedClient(clientId);
}
}
}
@@ -176,12 +180,13 @@ public class MQTTConnectionManager {
// [MQTT-3.1.3-8] Return ID rejected and disconnect if clean session = false and client id is null
return null;
}
- } else if (!session.getProtocolManager().getConnectedClients().add(clientId)) {
- // ^^^ If the client ID is not unique (i.e. it has already registered) then do not accept it.
-
+ } else {
+ MQTTConnection connection = session.getProtocolManager().addConnectedClient(clientId, session.getConnection());
- // [MQTT-3.1.3-9] Return ID Rejected if server rejects the client ID
- return null;
+ if (connection != null) {
+ // [MQTT-3.1.4-2] If the client ID represents a client already connected to the server then the server MUST disconnect the existing client
+ connection.disconnect(false);
+ }
}
return clientId;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dac62517/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
index 8ee4033..c8832ba 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
@@ -19,7 +19,8 @@ package org.apache.activemq.artemis.core.protocol.mqtt;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import java.util.Set;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
@@ -39,7 +40,6 @@ import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
-import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
/**
* MQTTProtocolManager
@@ -55,7 +55,7 @@ class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQTTInter
private final List<MQTTInterceptor> outgoingInterceptors = new ArrayList<>();
//TODO Read in a list of existing client IDs from stored Sessions.
- private Set<String> connectedClients = new ConcurrentHashSet<>();
+ private Map<String, MQTTConnection> connectedClients = new ConcurrentHashMap<>();
MQTTProtocolManager(ActiveMQServer server,
List<BaseInterceptor> incomingInterceptors,
@@ -178,7 +178,21 @@ class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQTTInter
super.invokeInterceptors(this.outgoingInterceptors, mqttMessage, connection);
}
- public Set<String> getConnectedClients() {
- return connectedClients;
+ public boolean isClientConnected(String clientId, MQTTConnection connection) {
+ return connectedClients.get(clientId).equals(connection);
+ }
+
+ public void removeConnectedClient(String clientId) {
+ connectedClients.remove(clientId);
+ }
+
+ /**
+ * @param clientId
+ * @param connection
+ * @return the {@code MQTTConnection} that the added connection replaced or null if there was no previous entry for
+ * the {@code clientId}
+ */
+ public MQTTConnection addConnectedClient(String clientId, MQTTConnection connection) {
+ return connectedClients.put(clientId, connection);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dac62517/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
index 9087938..8a1eea2 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
@@ -36,6 +36,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import org.apache.activemq.artemis.api.core.RoutingType;
@@ -56,7 +57,6 @@ import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
-import org.fusesource.mqtt.client.MQTTException;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
@@ -1350,11 +1350,8 @@ public class MQTTTest extends MQTTTestSupport {
connection.disconnect();
}
- @Ignore
@Test(timeout = 60 * 1000)
- // TODO We currently do not support link stealing. This needs to be enabled for this test to pass.
public void testDuplicateClientId() throws Exception {
- // test link stealing enabled by default
final String clientId = "duplicateClient";
MQTT mqtt = createMQTTConnection(clientId, false);
mqtt.setKeepAlive((short) 2);
@@ -1384,31 +1381,45 @@ public class MQTTTest extends MQTTTestSupport {
connection1.publish(TOPICA, TOPICA.getBytes(), QoS.EXACTLY_ONCE, true);
connection1.disconnect();
+ }
- // disable link stealing
- stopBroker();
- protocolConfig = "allowLinkStealing=false";
- startBroker();
+ @Test(timeout = 60 * 1000)
+ public void testRepeatedLinkStealing() throws Exception {
+ final String clientId = "duplicateClient";
+ final AtomicReference<BlockingConnection> oldConnection = new AtomicReference<>();
+ final String TOPICA = "TopicA";
- mqtt = createMQTTConnection(clientId, false);
- mqtt.setKeepAlive((short) 2);
- final BlockingConnection connection2 = mqtt.blockingConnection();
- connection2.connect();
- connection2.publish(TOPICA, TOPICA.getBytes(), QoS.EXACTLY_ONCE, true);
+ for (int i = 1; i <= 10; ++i) {
- mqtt1 = createMQTTConnection(clientId, false);
- mqtt1.setKeepAlive((short) 2);
- final BlockingConnection connection3 = mqtt1.blockingConnection();
- try {
- connection3.connect();
- fail("Duplicate client connected");
- } catch (Exception e) {
- // ignore
+ LOG.info("Creating MQTT Connection {}", i);
+
+ MQTT mqtt = createMQTTConnection(clientId, false);
+ mqtt.setKeepAlive((short) 2);
+ final BlockingConnection connection = mqtt.blockingConnection();
+ connection.connect();
+ connection.publish(TOPICA, TOPICA.getBytes(), QoS.EXACTLY_ONCE, true);
+
+ assertTrue("Client connect failed for attempt: " + i, Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisfied() throws Exception {
+ return connection.isConnected();
+ }
+ }, TimeUnit.SECONDS.toMillis(3), TimeUnit.MILLISECONDS.toMillis(200)));
+
+ if (oldConnection.get() != null) {
+ assertTrue("Old client still connected on attempt: " + i, Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisfied() throws Exception {
+ return !oldConnection.get().isConnected();
+ }
+ }, TimeUnit.SECONDS.toMillis(3), TimeUnit.MILLISECONDS.toMillis(200)));
+ }
+
+ oldConnection.set(connection);
}
- assertTrue("Old client disconnected", connection2.isConnected());
- connection2.publish(TOPICA, TOPICA.getBytes(), QoS.EXACTLY_ONCE, true);
- connection2.disconnect();
+ oldConnection.get().publish(TOPICA, TOPICA.getBytes(), QoS.EXACTLY_ONCE, true);
+ oldConnection.get().disconnect();
}
@Test(timeout = 30 * 10000)
@@ -1969,24 +1980,6 @@ public class MQTTTest extends MQTTTestSupport {
}
@Test
- public void testDuplicateIDReturnsError() throws Exception {
- String clientId = "clientId";
- MQTT mqtt = createMQTTConnection();
- mqtt.setClientId(clientId);
- mqtt.blockingConnection().connect();
-
- MQTTException e = null;
- try {
- MQTT mqtt2 = createMQTTConnection();
- mqtt2.setClientId(clientId);
- mqtt2.blockingConnection().connect();
- } catch (MQTTException mqttE) {
- e = mqttE;
- }
- assertTrue(e.getMessage().contains("CONNECTION_REFUSED_IDENTIFIER_REJECTED"));
- }
-
- @Test
public void testDoubleBroker() throws Exception {
/*
* Start two embedded server instances for MQTT and connect to them