You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/09/25 15:02:28 UTC
[4/4] activemq-artemis git commit: ARTEMIS-1391 embedding 2 MQTT
brokers is broken
ARTEMIS-1391 embedding 2 MQTT brokers is broken
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/144dbadc
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/144dbadc
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/144dbadc
Branch: refs/heads/master
Commit: 144dbadcb590344b89e2e81cc4a1141e5a27f22d
Parents: 53c8ee0
Author: Jens Reimann <jr...@redhat.com>
Authored: Thu Sep 7 17:23:09 2017 +0200
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Sep 25 11:00:56 2017 -0400
----------------------------------------------------------------------
.../protocol/mqtt/MQTTConnectionManager.java | 9 +---
.../core/protocol/mqtt/MQTTProtocolManager.java | 9 ++++
.../integration/mqtt/imported/MQTTFQQNTest.java | 6 ---
.../imported/MQTTInterceptorPropertiesTest.java | 18 +++----
.../integration/mqtt/imported/MQTTTest.java | 54 +++++++++++++++++---
.../integration/plugin/MqttPluginTest.java | 20 --------
6 files changed, 64 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/144dbadc/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
index 02e1c66..79b97a3 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
@@ -17,7 +17,6 @@
package org.apache.activemq.artemis.core.protocol.mqtt;
-import java.util.Set;
import java.util.UUID;
import io.netty.buffer.ByteBuf;
@@ -29,7 +28,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
import org.apache.activemq.artemis.utils.UUIDGenerator;
-import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
/**
* MQTTConnectionManager is responsible for handle Connect and Disconnect packets and any resulting behaviour of these
@@ -39,9 +37,6 @@ public class MQTTConnectionManager {
private MQTTSession session;
- //TODO Read in a list of existing client IDs from stored Sessions.
- public static Set<String> CONNECTED_CLIENTS = new ConcurrentHashSet<>();
-
private MQTTLogger log = MQTTLogger.LOGGER;
private boolean isWill = false;
@@ -149,7 +144,7 @@ public class MQTTConnectionManager {
session.getSessionState().setAttached(false);
String clientId = session.getSessionState().getClientId();
if (clientId != null) {
- CONNECTED_CLIENTS.remove(clientId);
+ session.getProtocolManager().getConnectedClients().remove(clientId);
}
}
}
@@ -181,7 +176,7 @@ public class MQTTConnectionManager {
// [MQTT-3.1.3-8] Return ID rejected and disconnect if clean session = false and client id is null
return null;
}
- } else if (!CONNECTED_CLIENTS.add(clientId)) {
+ } else if (!session.getProtocolManager().getConnectedClients().add(clientId)) {
// ^^^ If the client ID is not unique (i.e. it has already registered) then do not accept it.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/144dbadc/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
index 6118b0d..8ee4033 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.protocol.mqtt;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Set;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
@@ -38,6 +39,7 @@ import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
/**
* MQTTProtocolManager
@@ -52,6 +54,9 @@ class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQTTInter
private final List<MQTTInterceptor> incomingInterceptors = new ArrayList<>();
private final List<MQTTInterceptor> outgoingInterceptors = new ArrayList<>();
+ //TODO Read in a list of existing client IDs from stored Sessions.
+ private Set<String> connectedClients = new ConcurrentHashSet<>();
+
MQTTProtocolManager(ActiveMQServer server,
List<BaseInterceptor> incomingInterceptors,
List<BaseInterceptor> outgoingInterceptors) {
@@ -172,4 +177,8 @@ class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQTTInter
public void invokeOutgoing(MqttMessage mqttMessage, MQTTConnection connection) {
super.invokeInterceptors(this.outgoingInterceptors, mqttMessage, connection);
}
+
+ public Set<String> getConnectedClients() {
+ return connectedClients;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/144dbadc/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTFQQNTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTFQQNTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTFQQNTest.java
index 4f0b229..acbf5d7 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTFQQNTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTFQQNTest.java
@@ -24,10 +24,8 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.Binding;
-import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
-import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -44,10 +42,6 @@ public class MQTTFQQNTest extends MQTTTestSupport {
Field sessions = MQTTSession.class.getDeclaredField("SESSIONS");
sessions.setAccessible(true);
sessions.set(null, new ConcurrentHashMap<>());
-
- Field connectedClients = MQTTConnectionManager.class.getDeclaredField("CONNECTED_CLIENTS");
- connectedClients.setAccessible(true);
- connectedClients.set(null, new ConcurrentHashSet<>());
super.setUp();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/144dbadc/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java
index 2600952..c95a462 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java
@@ -16,27 +16,25 @@
*/
package org.apache.activemq.artemis.tests.integration.mqtt.imported;
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
-import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.apache.felix.resolver.util.ArrayMap;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ErrorCollector;
-import java.lang.reflect.Field;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
public class MQTTInterceptorPropertiesTest extends MQTTTestSupport {
@Override
@@ -45,10 +43,6 @@ public class MQTTInterceptorPropertiesTest extends MQTTTestSupport {
Field sessions = MQTTSession.class.getDeclaredField("SESSIONS");
sessions.setAccessible(true);
sessions.set(null, new ConcurrentHashMap<>());
-
- Field connectedClients = MQTTConnectionManager.class.getDeclaredField("CONNECTED_CLIENTS");
- connectedClients.setAccessible(true);
- connectedClients.set(null, new ConcurrentHashSet<>());
super.setUp();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/144dbadc/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
index e3c4856..9087938 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
@@ -40,15 +40,15 @@ import java.util.regex.Pattern;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
-import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.tests.util.Wait;
-import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
@@ -85,12 +85,7 @@ public class MQTTTest extends MQTTTestSupport {
Field sessions = MQTTSession.class.getDeclaredField("SESSIONS");
sessions.setAccessible(true);
sessions.set(null, new ConcurrentHashMap<>());
-
- Field connectedClients = MQTTConnectionManager.class.getDeclaredField("CONNECTED_CLIENTS");
- connectedClients.setAccessible(true);
- connectedClients.set(null, new ConcurrentHashSet<>());
super.setUp();
-
}
@Test
@@ -1990,4 +1985,49 @@ public class MQTTTest extends MQTTTestSupport {
}
assertTrue(e.getMessage().contains("CONNECTION_REFUSED_IDENTIFIER_REJECTED"));
}
+
+ @Test
+ public void testDoubleBroker() throws Exception {
+ /*
+ * Start two embedded server instances for MQTT and connect to them
+ * with the same MQTT client id. As those are two different instances
+ * connecting to them with the same client ID must succeed.
+ */
+
+ final int port1 = 1884;
+ final int port2 = 1885;
+
+ final Configuration cfg1 = createDefaultConfig(1, false);
+ cfg1.addAcceptorConfiguration("mqtt1", "tcp://localhost:" + port1 + "?protocols=MQTT");
+
+ final Configuration cfg2 = createDefaultConfig(2, false);
+ cfg2.addAcceptorConfiguration("mqtt2", "tcp://localhost:" + port2 + "?protocols=MQTT");
+
+ final ActiveMQServer server1 = createServer(cfg1);
+ server1.start();
+ final ActiveMQServer server2 = createServer(cfg2);
+ server2.start();
+
+ final String clientId = "client1";
+ final MQTT mqtt1 = createMQTTConnection(clientId, true);
+ final MQTT mqtt2 = createMQTTConnection(clientId, true);
+
+ mqtt1.setHost("localhost", port1);
+ mqtt2.setHost("localhost", port2);
+
+ final BlockingConnection connection1 = mqtt1.blockingConnection();
+ final BlockingConnection connection2 = mqtt2.blockingConnection();
+
+ try {
+ connection1.connect();
+ connection2.connect();
+ } catch (Exception e) {
+ fail("Connections should have worked.");
+ } finally {
+ if (connection1.isConnected())
+ connection1.disconnect();
+ if (connection2.isConnected())
+ connection2.disconnect();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/144dbadc/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MqttPluginTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MqttPluginTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MqttPluginTest.java
index 660df34..2365ae5 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MqttPluginTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MqttPluginTest.java
@@ -16,20 +16,14 @@
*/
package org.apache.activemq.artemis.tests.integration.plugin;
-import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager;
-import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
import org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTClientProvider;
import org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTestSupport;
-import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
-import org.junit.Before;
import org.junit.Test;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER;
@@ -62,20 +56,6 @@ public class MqttPluginTest extends MQTTTestSupport {
private final MethodCalledVerifier verifier = new MethodCalledVerifier(methodCalls);
@Override
- @Before
- public void setUp() throws Exception {
- Field sessions = MQTTSession.class.getDeclaredField("SESSIONS");
- sessions.setAccessible(true);
- sessions.set(null, new ConcurrentHashMap<>());
-
- Field connectedClients = MQTTConnectionManager.class.getDeclaredField("CONNECTED_CLIENTS");
- connectedClients.setAccessible(true);
- connectedClients.set(null, new ConcurrentHashSet<>());
- super.setUp();
-
- }
-
- @Override
public void configureBroker() throws Exception {
super.configureBroker();
server.registerBrokerPlugin(verifier);