You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/09/25 15:02:25 UTC

[1/4] activemq-artemis git commit: This closes #1539

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 53c8ee007 -> 28c96e100


This closes #1539


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/28c96e10
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/28c96e10
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/28c96e10

Branch: refs/heads/master
Commit: 28c96e1002cacb8f51e346c7c410befe593aeeaf
Parents: 53c8ee0 956c4c6
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon Sep 25 11:00:56 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Sep 25 11:00:56 2017 -0400

----------------------------------------------------------------------
 .../protocol/mqtt/MQTTConnectionManager.java    |  24 +--
 .../core/protocol/mqtt/MQTTProtocolManager.java |  23 +++
 .../integration/mqtt/imported/MQTTFQQNTest.java |   6 -
 .../imported/MQTTInterceptorPropertiesTest.java |  18 +-
 .../integration/mqtt/imported/MQTTTest.java     | 199 ++++++++-----------
 .../integration/plugin/MqttPluginTest.java      |  20 --
 .../activemq/artemis/tests/util/Wait.java       |   4 +-
 7 files changed, 121 insertions(+), 173 deletions(-)
----------------------------------------------------------------------



[4/4] activemq-artemis git commit: ARTEMIS-1391 embedding 2 MQTT brokers is broken

Posted by cl...@apache.org.
ARTEMIS-1391 embedding 2 MQTT brokers is broken


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/144dbadc
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/144dbadc
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/144dbadc

Branch: refs/heads/master
Commit: 144dbadcb590344b89e2e81cc4a1141e5a27f22d
Parents: 53c8ee0
Author: Jens Reimann <jr...@redhat.com>
Authored: Thu Sep 7 17:23:09 2017 +0200
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Sep 25 11:00:56 2017 -0400

----------------------------------------------------------------------
 .../protocol/mqtt/MQTTConnectionManager.java    |  9 +---
 .../core/protocol/mqtt/MQTTProtocolManager.java |  9 ++++
 .../integration/mqtt/imported/MQTTFQQNTest.java |  6 ---
 .../imported/MQTTInterceptorPropertiesTest.java | 18 +++----
 .../integration/mqtt/imported/MQTTTest.java     | 54 +++++++++++++++++---
 .../integration/plugin/MqttPluginTest.java      | 20 --------
 6 files changed, 64 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/144dbadc/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
index 02e1c66..79b97a3 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
@@ -17,7 +17,6 @@
 
 package org.apache.activemq.artemis.core.protocol.mqtt;
 
-import java.util.Set;
 import java.util.UUID;
 
 import io.netty.buffer.ByteBuf;
@@ -29,7 +28,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
 import org.apache.activemq.artemis.utils.UUIDGenerator;
-import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
 
 /**
  * MQTTConnectionManager is responsible for handle Connect and Disconnect packets and any resulting behaviour of these
@@ -39,9 +37,6 @@ public class MQTTConnectionManager {
 
    private MQTTSession session;
 
-   //TODO Read in a list of existing client IDs from stored Sessions.
-   public static Set<String> CONNECTED_CLIENTS = new ConcurrentHashSet<>();
-
    private MQTTLogger log = MQTTLogger.LOGGER;
 
    private boolean isWill = false;
@@ -149,7 +144,7 @@ public class MQTTConnectionManager {
             session.getSessionState().setAttached(false);
             String clientId = session.getSessionState().getClientId();
             if (clientId != null) {
-               CONNECTED_CLIENTS.remove(clientId);
+               session.getProtocolManager().getConnectedClients().remove(clientId);
             }
          }
       }
@@ -181,7 +176,7 @@ public class MQTTConnectionManager {
             // [MQTT-3.1.3-8] Return ID rejected and disconnect if clean session = false and client id is null
             return null;
          }
-      } else if (!CONNECTED_CLIENTS.add(clientId)) {
+      } else if (!session.getProtocolManager().getConnectedClients().add(clientId)) {
          // ^^^ If the client ID is not unique (i.e. it has already registered) then do not accept it.
 
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/144dbadc/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
index 6118b0d..8ee4033 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.protocol.mqtt;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Set;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
@@ -38,6 +39,7 @@ import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
 
 /**
  * MQTTProtocolManager
@@ -52,6 +54,9 @@ class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQTTInter
    private final List<MQTTInterceptor> incomingInterceptors = new ArrayList<>();
    private final List<MQTTInterceptor> outgoingInterceptors = new ArrayList<>();
 
+   //TODO Read in a list of existing client IDs from stored Sessions.
+   private Set<String> connectedClients = new ConcurrentHashSet<>();
+
    MQTTProtocolManager(ActiveMQServer server,
                        List<BaseInterceptor> incomingInterceptors,
                        List<BaseInterceptor> outgoingInterceptors) {
@@ -172,4 +177,8 @@ class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQTTInter
    public void invokeOutgoing(MqttMessage mqttMessage, MQTTConnection connection) {
       super.invokeInterceptors(this.outgoingInterceptors, mqttMessage, connection);
    }
+
+   public Set<String> getConnectedClients() {
+      return connectedClients;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/144dbadc/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTFQQNTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTFQQNTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTFQQNTest.java
index 4f0b229..acbf5d7 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTFQQNTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTFQQNTest.java
@@ -24,10 +24,8 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.postoffice.Binding;
-import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager;
 import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
-import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -44,10 +42,6 @@ public class MQTTFQQNTest extends MQTTTestSupport {
       Field sessions = MQTTSession.class.getDeclaredField("SESSIONS");
       sessions.setAccessible(true);
       sessions.set(null, new ConcurrentHashMap<>());
-
-      Field connectedClients = MQTTConnectionManager.class.getDeclaredField("CONNECTED_CLIENTS");
-      connectedClients.setAccessible(true);
-      connectedClients.set(null, new ConcurrentHashSet<>());
       super.setUp();
 
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/144dbadc/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java
index 2600952..c95a462 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java
@@ -16,27 +16,25 @@
  */
 package org.apache.activemq.artemis.tests.integration.mqtt.imported;
 
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
 import io.netty.handler.codec.mqtt.MqttFixedHeader;
 import io.netty.handler.codec.mqtt.MqttMessage;
 import io.netty.handler.codec.mqtt.MqttPublishMessage;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager;
 import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor;
 import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
-import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
 import org.apache.felix.resolver.util.ArrayMap;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ErrorCollector;
 
-import java.lang.reflect.Field;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
 public class MQTTInterceptorPropertiesTest extends MQTTTestSupport {
 
    @Override
@@ -45,10 +43,6 @@ public class MQTTInterceptorPropertiesTest extends MQTTTestSupport {
       Field sessions = MQTTSession.class.getDeclaredField("SESSIONS");
       sessions.setAccessible(true);
       sessions.set(null, new ConcurrentHashMap<>());
-
-      Field connectedClients = MQTTConnectionManager.class.getDeclaredField("CONNECTED_CLIENTS");
-      connectedClients.setAccessible(true);
-      connectedClients.set(null, new ConcurrentHashSet<>());
       super.setUp();
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/144dbadc/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
index e3c4856..9087938 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
@@ -40,15 +40,15 @@ import java.util.regex.Pattern;
 
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
 import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
-import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager;
 import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
 import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.tests.util.Wait;
-import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
 import org.apache.activemq.transport.amqp.client.AmqpConnection;
 import org.apache.activemq.transport.amqp.client.AmqpMessage;
@@ -85,12 +85,7 @@ public class MQTTTest extends MQTTTestSupport {
       Field sessions = MQTTSession.class.getDeclaredField("SESSIONS");
       sessions.setAccessible(true);
       sessions.set(null, new ConcurrentHashMap<>());
-
-      Field connectedClients = MQTTConnectionManager.class.getDeclaredField("CONNECTED_CLIENTS");
-      connectedClients.setAccessible(true);
-      connectedClients.set(null, new ConcurrentHashSet<>());
       super.setUp();
-
    }
 
    @Test
@@ -1990,4 +1985,49 @@ public class MQTTTest extends MQTTTestSupport {
       }
       assertTrue(e.getMessage().contains("CONNECTION_REFUSED_IDENTIFIER_REJECTED"));
    }
+
+   @Test
+   public void testDoubleBroker() throws Exception {
+      /*
+       * Start two embedded server instances for MQTT and connect to them
+       * with the same MQTT client id. As those are two different instances
+       * connecting to them with the same client ID must succeed.
+       */
+
+      final int port1 = 1884;
+      final int port2 = 1885;
+
+      final Configuration cfg1 = createDefaultConfig(1, false);
+      cfg1.addAcceptorConfiguration("mqtt1", "tcp://localhost:" + port1 + "?protocols=MQTT");
+
+      final Configuration cfg2 = createDefaultConfig(2, false);
+      cfg2.addAcceptorConfiguration("mqtt2", "tcp://localhost:" + port2 + "?protocols=MQTT");
+
+      final ActiveMQServer server1 = createServer(cfg1);
+      server1.start();
+      final ActiveMQServer server2 = createServer(cfg2);
+      server2.start();
+
+      final String clientId = "client1";
+      final MQTT mqtt1 = createMQTTConnection(clientId, true);
+      final MQTT mqtt2 = createMQTTConnection(clientId, true);
+
+      mqtt1.setHost("localhost", port1);
+      mqtt2.setHost("localhost", port2);
+
+      final BlockingConnection connection1 = mqtt1.blockingConnection();
+      final BlockingConnection connection2 = mqtt2.blockingConnection();
+
+      try {
+         connection1.connect();
+         connection2.connect();
+      } catch (Exception e) {
+         fail("Connections should have worked.");
+      } finally {
+         if (connection1.isConnected())
+            connection1.disconnect();
+         if (connection2.isConnected())
+            connection2.disconnect();
+      }
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/144dbadc/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MqttPluginTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MqttPluginTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MqttPluginTest.java
index 660df34..2365ae5 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MqttPluginTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MqttPluginTest.java
@@ -16,20 +16,14 @@
  */
 package org.apache.activemq.artemis.tests.integration.plugin;
 
-import java.lang.reflect.Field;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager;
-import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
 import org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTClientProvider;
 import org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTestSupport;
-import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
-import org.junit.Before;
 import org.junit.Test;
 
 import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER;
@@ -62,20 +56,6 @@ public class MqttPluginTest extends MQTTTestSupport {
    private final MethodCalledVerifier verifier = new MethodCalledVerifier(methodCalls);
 
    @Override
-   @Before
-   public void setUp() throws Exception {
-      Field sessions = MQTTSession.class.getDeclaredField("SESSIONS");
-      sessions.setAccessible(true);
-      sessions.set(null, new ConcurrentHashMap<>());
-
-      Field connectedClients = MQTTConnectionManager.class.getDeclaredField("CONNECTED_CLIENTS");
-      connectedClients.setAccessible(true);
-      connectedClients.set(null, new ConcurrentHashSet<>());
-      super.setUp();
-
-   }
-
-   @Override
    public void configureBroker() throws Exception {
       super.configureBroker();
       server.registerBrokerPlugin(verifier);


[3/4] activemq-artemis git commit: NO-JIRA use lambdas in tests for readability

Posted by cl...@apache.org.
NO-JIRA use lambdas in tests for readability


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/956c4c64
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/956c4c64
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/956c4c64

Branch: refs/heads/master
Commit: 956c4c64935ae08a3f37be37f10a132f8071c23a
Parents: dac6251
Author: Justin Bertram <jb...@apache.org>
Authored: Fri Sep 15 11:16:44 2017 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Sep 25 11:00:56 2017 -0400

----------------------------------------------------------------------
 .../integration/mqtt/imported/MQTTTest.java     | 104 +++----------------
 .../activemq/artemis/tests/util/Wait.java       |   4 +-
 2 files changed, 16 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/956c4c64/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
index 8a1eea2..9338384 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
@@ -842,26 +842,14 @@ public class MQTTTest extends MQTTTestSupport {
       // publish non-retained message
       connection.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
 
-      Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisfied() throws Exception {
-            return publishList.size() == 2;
-         }
-      }, 5000);
-      assertEquals(2, publishList.size());
+      assertTrue(Wait.waitFor(() -> publishList.size() == 2, 5000));
 
       connection.disconnect();
 
       connection = mqtt.blockingConnection();
       connection.connect();
 
-      Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisfied() throws Exception {
-            return publishList.size() == 4;
-         }
-      }, 5000);
-      assertEquals(4, publishList.size());
+      assertTrue(Wait.waitFor(() -> publishList.size() == 4, 5000));
 
       // TODO Investigate if receiving the same ID for overlapping subscriptions is actually spec compliant.
       // In Artemis we send a new ID for every copy of the message.
@@ -1018,12 +1006,7 @@ public class MQTTTest extends MQTTTestSupport {
 
       final BlockingConnection connection = mqtt.blockingConnection();
       connection.connect();
-      Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisfied() throws Exception {
-            return connection.isConnected();
-         }
-      });
+      Wait.waitFor(() -> connection.isConnected());
 
       final String TOPIC = "TopicA";
       final byte[] qos = connection.subscribe(new Topic[]{new Topic(TOPIC, QoS.EXACTLY_ONCE)});
@@ -1037,12 +1020,7 @@ public class MQTTTest extends MQTTTestSupport {
 
       final BlockingConnection newConnection = mqtt.blockingConnection();
       newConnection.connect();
-      Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisfied() throws Exception {
-            return newConnection.isConnected();
-         }
-      });
+      Wait.waitFor(() -> newConnection.isConnected());
 
       assertEquals(QoS.EXACTLY_ONCE.ordinal(), qos[0]);
       Message msg = newConnection.receive(1000, TimeUnit.MILLISECONDS);
@@ -1064,12 +1042,7 @@ public class MQTTTest extends MQTTTestSupport {
 
       final BlockingConnection connection = mqtt.blockingConnection();
       connection.connect();
-      Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisfied() throws Exception {
-            return connection.isConnected();
-         }
-      });
+      Wait.waitFor(() -> connection.isConnected());
 
       MQTT mqtt2 = createMQTTConnection("2", false);
       BlockingConnection connection2 = mqtt2.blockingConnection();
@@ -1098,12 +1071,7 @@ public class MQTTTest extends MQTTTestSupport {
 
       final BlockingConnection connection = mqtt.blockingConnection();
       connection.connect();
-      Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisfied() throws Exception {
-            return connection.isConnected();
-         }
-      });
+      Wait.waitFor(() -> connection.isConnected());
 
       // kill transport
       connection.kill();
@@ -1276,13 +1244,7 @@ public class MQTTTest extends MQTTTestSupport {
       final BlockingConnection connection = mqtt.blockingConnection();
       connection.connect();
 
-      assertTrue("KeepAlive didn't work properly", Wait.waitFor(new Wait.Condition() {
-
-         @Override
-         public boolean isSatisfied() throws Exception {
-            return connection.isConnected();
-         }
-      }));
+      assertTrue("KeepAlive didn't work properly", Wait.waitFor(() -> connection.isConnected()));
 
       connection.disconnect();
    }
@@ -1299,13 +1261,7 @@ public class MQTTTest extends MQTTTestSupport {
       final BlockingConnection connection = mqtt.blockingConnection();
       connection.connect();
 
-      assertTrue("KeepAlive didn't work properly", Wait.waitFor(new Wait.Condition() {
-
-         @Override
-         public boolean isSatisfied() throws Exception {
-            return connection.isConnected();
-         }
-      }));
+      assertTrue("KeepAlive didn't work properly", Wait.waitFor(() -> connection.isConnected()));
 
       connection.disconnect();
    }
@@ -1365,19 +1321,9 @@ public class MQTTTest extends MQTTTestSupport {
       final BlockingConnection connection1 = mqtt1.blockingConnection();
       connection1.connect();
 
-      assertTrue("Duplicate client disconnected", Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisfied() throws Exception {
-            return connection1.isConnected();
-         }
-      }));
+      assertTrue("Duplicate client disconnected", Wait.waitFor(() -> connection1.isConnected()));
 
-      assertTrue("Old client still connected", Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisfied() throws Exception {
-            return !connection.isConnected();
-         }
-      }));
+      assertTrue("Old client still connected", Wait.waitFor(() -> !connection.isConnected()));
 
       connection1.publish(TOPICA, TOPICA.getBytes(), QoS.EXACTLY_ONCE, true);
       connection1.disconnect();
@@ -1399,20 +1345,10 @@ public class MQTTTest extends MQTTTestSupport {
          connection.connect();
          connection.publish(TOPICA, TOPICA.getBytes(), QoS.EXACTLY_ONCE, true);
 
-         assertTrue("Client connect failed for attempt: " + i, Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisfied() throws Exception {
-               return connection.isConnected();
-            }
-         }, TimeUnit.SECONDS.toMillis(3), TimeUnit.MILLISECONDS.toMillis(200)));
+         assertTrue("Client connect failed for attempt: " + i, Wait.waitFor(() -> connection.isConnected(), 3000, 200));
 
          if (oldConnection.get() != null) {
-            assertTrue("Old client still connected on attempt: " + i, Wait.waitFor(new Wait.Condition() {
-               @Override
-               public boolean isSatisfied() throws Exception {
-                  return !oldConnection.get().isConnected();
-               }
-            }, TimeUnit.SECONDS.toMillis(3), TimeUnit.MILLISECONDS.toMillis(200)));
+            assertTrue("Old client still connected on attempt: " + i, Wait.waitFor(() -> !oldConnection.get().isConnected(), 3000, 200));
          }
 
          oldConnection.set(connection);
@@ -1575,13 +1511,7 @@ public class MQTTTest extends MQTTTestSupport {
       final BlockingConnection connection = mqtt.blockingConnection();
       connection.connect();
 
-      assertTrue("KeepAlive didn't work properly", Wait.waitFor(new Wait.Condition() {
-
-         @Override
-         public boolean isSatisfied() throws Exception {
-            return connection.isConnected();
-         }
-      }));
+      assertTrue("KeepAlive didn't work properly", Wait.waitFor(() -> connection.isConnected()));
    }
 
    @Test(timeout = 60 * 1000)
@@ -1773,13 +1703,7 @@ public class MQTTTest extends MQTTTestSupport {
       mqtt.setKeepAlive((short) 2);
       final BlockingConnection connection = mqtt.blockingConnection();
       connection.connect();
-      assertTrue("KeepAlive didn't work properly", Wait.waitFor(new Wait.Condition() {
-
-         @Override
-         public boolean isSatisfied() throws Exception {
-            return connection.isConnected();
-         }
-      }));
+      assertTrue("KeepAlive didn't work properly", Wait.waitFor(() -> connection.isConnected()));
 
       connection.disconnect();
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/956c4c64/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/Wait.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/Wait.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/Wait.java
index 795a478..2f3772a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/Wait.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/Wait.java
@@ -40,10 +40,10 @@ public class Wait {
    }
 
    public static boolean waitFor(final Condition condition,
-                                 final long duration,
+                                 final long durationMillis,
                                  final long sleepMillis) throws Exception {
 
-      final long expiry = System.currentTimeMillis() + duration;
+      final long expiry = System.currentTimeMillis() + durationMillis;
       boolean conditionSatisified = condition.isSatisfied();
       while (!conditionSatisified && System.currentTimeMillis() < expiry) {
          TimeUnit.MILLISECONDS.sleep(sleepMillis);


[2/4] activemq-artemis git commit: ARTEMIS-1218 implement MQTT link stealing

Posted by cl...@apache.org.
ARTEMIS-1218 implement MQTT link stealing


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/dac62517
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/dac62517
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/dac62517

Branch: refs/heads/master
Commit: dac625179ac95e4953be3cb2ae6e2a8513235b38
Parents: 144dbad
Author: Justin Bertram <jb...@apache.org>
Authored: Fri Sep 15 10:59:57 2017 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Sep 25 11:00:56 2017 -0400

----------------------------------------------------------------------
 .../protocol/mqtt/MQTTConnectionManager.java    | 19 +++--
 .../core/protocol/mqtt/MQTTProtocolManager.java | 24 ++++--
 .../integration/mqtt/imported/MQTTTest.java     | 77 +++++++++-----------
 3 files changed, 66 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dac62517/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
index 79b97a3..7e88028 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
@@ -143,8 +143,12 @@ public class MQTTConnectionManager {
          if (session.getSessionState() != null) {
             session.getSessionState().setAttached(false);
             String clientId = session.getSessionState().getClientId();
-            if (clientId != null) {
-               session.getProtocolManager().getConnectedClients().remove(clientId);
+            /**
+             *  ensure that the connection for the client ID matches *this* connection otherwise we could remove the
+             *  entry for the client who "stole" this client ID via [MQTT-3.1.4-2]
+             */
+            if (clientId != null && session.getProtocolManager().isClientConnected(clientId, session.getConnection())) {
+               session.getProtocolManager().removeConnectedClient(clientId);
             }
          }
       }
@@ -176,12 +180,13 @@ public class MQTTConnectionManager {
             // [MQTT-3.1.3-8] Return ID rejected and disconnect if clean session = false and client id is null
             return null;
          }
-      } else if (!session.getProtocolManager().getConnectedClients().add(clientId)) {
-         // ^^^ If the client ID is not unique (i.e. it has already registered) then do not accept it.
-
+      } else {
+         MQTTConnection connection = session.getProtocolManager().addConnectedClient(clientId, session.getConnection());
 
-         // [MQTT-3.1.3-9] Return ID Rejected if server rejects the client ID
-         return null;
+         if (connection != null) {
+            // [MQTT-3.1.4-2] If the client ID represents a client already connected to the server then the server MUST disconnect the existing client
+            connection.disconnect(false);
+         }
       }
       return clientId;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dac62517/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
index 8ee4033..c8832ba 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
@@ -19,7 +19,8 @@ package org.apache.activemq.artemis.core.protocol.mqtt;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.Set;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
@@ -39,7 +40,6 @@ import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
-import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
 
 /**
  * MQTTProtocolManager
@@ -55,7 +55,7 @@ class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQTTInter
    private final List<MQTTInterceptor> outgoingInterceptors = new ArrayList<>();
 
    //TODO Read in a list of existing client IDs from stored Sessions.
-   private Set<String> connectedClients = new ConcurrentHashSet<>();
+   private Map<String, MQTTConnection> connectedClients = new ConcurrentHashMap<>();
 
    MQTTProtocolManager(ActiveMQServer server,
                        List<BaseInterceptor> incomingInterceptors,
@@ -178,7 +178,21 @@ class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQTTInter
       super.invokeInterceptors(this.outgoingInterceptors, mqttMessage, connection);
    }
 
-   public Set<String> getConnectedClients() {
-      return connectedClients;
+   public boolean isClientConnected(String clientId, MQTTConnection connection) {
+      return connectedClients.get(clientId).equals(connection);
+   }
+
+   public void removeConnectedClient(String clientId) {
+      connectedClients.remove(clientId);
+   }
+
+   /**
+    * @param clientId
+    * @param connection
+    * @return the {@code MQTTConnection} that the added connection replaced or null if there was no previous entry for
+    * the {@code clientId}
+    */
+   public MQTTConnection addConnectedClient(String clientId, MQTTConnection connection) {
+      return connectedClients.put(clientId, connection);
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dac62517/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
index 9087938..8a1eea2 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
@@ -36,6 +36,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
 
 import org.apache.activemq.artemis.api.core.RoutingType;
@@ -56,7 +57,6 @@ import org.apache.activemq.transport.amqp.client.AmqpSender;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
 import org.fusesource.mqtt.client.BlockingConnection;
 import org.fusesource.mqtt.client.MQTT;
-import org.fusesource.mqtt.client.MQTTException;
 import org.fusesource.mqtt.client.Message;
 import org.fusesource.mqtt.client.QoS;
 import org.fusesource.mqtt.client.Topic;
@@ -1350,11 +1350,8 @@ public class MQTTTest extends MQTTTestSupport {
       connection.disconnect();
    }
 
-   @Ignore
    @Test(timeout = 60 * 1000)
-   // TODO We currently do not support link stealing.  This needs to be enabled for this test to pass.
    public void testDuplicateClientId() throws Exception {
-      // test link stealing enabled by default
       final String clientId = "duplicateClient";
       MQTT mqtt = createMQTTConnection(clientId, false);
       mqtt.setKeepAlive((short) 2);
@@ -1384,31 +1381,45 @@ public class MQTTTest extends MQTTTestSupport {
 
       connection1.publish(TOPICA, TOPICA.getBytes(), QoS.EXACTLY_ONCE, true);
       connection1.disconnect();
+   }
 
-      // disable link stealing
-      stopBroker();
-      protocolConfig = "allowLinkStealing=false";
-      startBroker();
+   @Test(timeout = 60 * 1000)
+   public void testRepeatedLinkStealing() throws Exception {
+      final String clientId = "duplicateClient";
+      final AtomicReference<BlockingConnection> oldConnection = new AtomicReference<>();
+      final String TOPICA = "TopicA";
 
-      mqtt = createMQTTConnection(clientId, false);
-      mqtt.setKeepAlive((short) 2);
-      final BlockingConnection connection2 = mqtt.blockingConnection();
-      connection2.connect();
-      connection2.publish(TOPICA, TOPICA.getBytes(), QoS.EXACTLY_ONCE, true);
+      for (int i = 1; i <= 10; ++i) {
 
-      mqtt1 = createMQTTConnection(clientId, false);
-      mqtt1.setKeepAlive((short) 2);
-      final BlockingConnection connection3 = mqtt1.blockingConnection();
-      try {
-         connection3.connect();
-         fail("Duplicate client connected");
-      } catch (Exception e) {
-         // ignore
+         LOG.info("Creating MQTT Connection {}", i);
+
+         MQTT mqtt = createMQTTConnection(clientId, false);
+         mqtt.setKeepAlive((short) 2);
+         final BlockingConnection connection = mqtt.blockingConnection();
+         connection.connect();
+         connection.publish(TOPICA, TOPICA.getBytes(), QoS.EXACTLY_ONCE, true);
+
+         assertTrue("Client connect failed for attempt: " + i, Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisfied() throws Exception {
+               return connection.isConnected();
+            }
+         }, TimeUnit.SECONDS.toMillis(3), TimeUnit.MILLISECONDS.toMillis(200)));
+
+         if (oldConnection.get() != null) {
+            assertTrue("Old client still connected on attempt: " + i, Wait.waitFor(new Wait.Condition() {
+               @Override
+               public boolean isSatisfied() throws Exception {
+                  return !oldConnection.get().isConnected();
+               }
+            }, TimeUnit.SECONDS.toMillis(3), TimeUnit.MILLISECONDS.toMillis(200)));
+         }
+
+         oldConnection.set(connection);
       }
 
-      assertTrue("Old client disconnected", connection2.isConnected());
-      connection2.publish(TOPICA, TOPICA.getBytes(), QoS.EXACTLY_ONCE, true);
-      connection2.disconnect();
+      oldConnection.get().publish(TOPICA, TOPICA.getBytes(), QoS.EXACTLY_ONCE, true);
+      oldConnection.get().disconnect();
    }
 
    @Test(timeout = 30 * 10000)
@@ -1969,24 +1980,6 @@ public class MQTTTest extends MQTTTestSupport {
    }
 
    @Test
-   public void testDuplicateIDReturnsError() throws Exception {
-      String clientId = "clientId";
-      MQTT mqtt = createMQTTConnection();
-      mqtt.setClientId(clientId);
-      mqtt.blockingConnection().connect();
-
-      MQTTException e = null;
-      try {
-         MQTT mqtt2 = createMQTTConnection();
-         mqtt2.setClientId(clientId);
-         mqtt2.blockingConnection().connect();
-      } catch (MQTTException mqttE) {
-         e = mqttE;
-      }
-      assertTrue(e.getMessage().contains("CONNECTION_REFUSED_IDENTIFIER_REJECTED"));
-   }
-
-   @Test
    public void testDoubleBroker() throws Exception {
       /*
        * Start two embedded server instances for MQTT and connect to them