You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/10/09 18:40:23 UTC

git commit: Apply fix and add test for: https://issues.apache.org/jira/browse/AMQ-5385

Repository: activemq
Updated Branches:
  refs/heads/trunk 97c127d2d -> 62c20ebdc


Apply fix and add test for:
https://issues.apache.org/jira/browse/AMQ-5385

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/62c20ebd
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/62c20ebd
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/62c20ebd

Branch: refs/heads/trunk
Commit: 62c20ebdcfcf4307a4379b39d8352891f8824d56
Parents: 97c127d
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Oct 9 12:40:13 2014 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Oct 9 12:40:13 2014 -0400

----------------------------------------------------------------------
 .../activemq/broker/region/RegionBroker.java    | 30 +++++++-------
 .../activemq/transport/mqtt/MQTTTest.java       | 41 ++++++++++++++++++++
 2 files changed, 56 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/62c20ebd/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
index fb7d69e..4ebcef5 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
@@ -245,23 +245,23 @@ public class RegionBroker extends EmptyBroker {
         synchronized (clientIdSet) {
             ConnectionContext oldContext = clientIdSet.get(clientId);
             if (oldContext != null) {
-                if (context.isAllowLinkStealing()){
-                     clientIdSet.remove(clientId);
-                     if (oldContext.getConnection() != null) {
-                         Connection connection = oldContext.getConnection();
-                         LOG.warn("Stealing link for clientId {} From Connection {}", clientId, oldContext.getConnection());
-                         if (connection instanceof TransportConnection){
+                if (context.isAllowLinkStealing()) {
+                    clientIdSet.put(clientId, context);
+                    if (oldContext.getConnection() != null) {
+                        Connection connection = oldContext.getConnection();
+                        LOG.warn("Stealing link for clientId {} From Connection {}", clientId, oldContext.getConnection());
+                        if (connection instanceof TransportConnection) {
                             TransportConnection transportConnection = (TransportConnection) connection;
-                             transportConnection.stopAsync();
-                         }else{
-                             connection.stop();
-                         }
-                     }else{
-                         LOG.error("Not Connection for {}", oldContext);
-                     }
-                }else{
+                            transportConnection.stopAsync();
+                        } else {
+                            connection.stop();
+                        }
+                    } else {
+                        LOG.error("No Connection found for {}", oldContext);
+                    }
+                } else {
                     throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from "
-                            + oldContext.getConnection().getRemoteAddress());
+                        + oldContext.getConnection().getRemoteAddress());
                 }
             } else {
                 clientIdSet.put(clientId, context);

http://git-wip-us.apache.org/repos/asf/activemq/blob/62c20ebd/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index 1586ff4..32f8167 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -34,6 +34,7 @@ import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
 
 import javax.jms.BytesMessage;
@@ -1227,6 +1228,46 @@ public class MQTTTest extends MQTTTestSupport {
         connection2.disconnect();
     }
 
+    @Test(timeout = 60 * 1000)
+    public void testRepeatedLinkStealing() throws Exception {
+        final String clientId = "duplicateClient";
+        final AtomicReference<BlockingConnection> oldConnection = new AtomicReference<BlockingConnection>();
+        final String TOPICA = "TopicA";
+
+        for (int i = 1; i <= 10; ++i) {
+
+            LOG.info("Creating MQTT Connection {}", i);
+
+            MQTT mqtt = createMQTTConnection(clientId, false);
+            mqtt.setKeepAlive((short) 2);
+            final BlockingConnection connection = mqtt.blockingConnection();
+            connection.connect();
+            connection.publish(TOPICA, TOPICA.getBytes(), QoS.EXACTLY_ONCE, true);
+
+            assertTrue("Client connect failed for attempt: " + i, Wait.waitFor(new Wait.Condition() {
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return connection.isConnected();
+                }
+            }));
+
+            if (oldConnection.get() != null) {
+
+                assertTrue("Old client still connected", Wait.waitFor(new Wait.Condition() {
+                    @Override
+                    public boolean isSatisified() throws Exception {
+                        return !oldConnection.get().isConnected();
+                    }
+                }));
+            }
+
+            oldConnection.set(connection);
+        }
+
+        oldConnection.get().publish(TOPICA, TOPICA.getBytes(), QoS.EXACTLY_ONCE, true);
+        oldConnection.get().disconnect();
+    }
+
     @Test(timeout = 30 * 10000)
     public void testJmsMapping() throws Exception {
         doTestJmsMapping("test.foo");