You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/10/09 18:40:23 UTC
git commit: Apply fix and add test for:
https://issues.apache.org/jira/browse/AMQ-5385
Repository: activemq
Updated Branches:
refs/heads/trunk 97c127d2d -> 62c20ebdc
Apply fix and add test for:
https://issues.apache.org/jira/browse/AMQ-5385
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/62c20ebd
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/62c20ebd
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/62c20ebd
Branch: refs/heads/trunk
Commit: 62c20ebdcfcf4307a4379b39d8352891f8824d56
Parents: 97c127d
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Oct 9 12:40:13 2014 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Oct 9 12:40:13 2014 -0400
----------------------------------------------------------------------
.../activemq/broker/region/RegionBroker.java | 30 +++++++-------
.../activemq/transport/mqtt/MQTTTest.java | 41 ++++++++++++++++++++
2 files changed, 56 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/62c20ebd/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
index fb7d69e..4ebcef5 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
@@ -245,23 +245,23 @@ public class RegionBroker extends EmptyBroker {
synchronized (clientIdSet) {
ConnectionContext oldContext = clientIdSet.get(clientId);
if (oldContext != null) {
- if (context.isAllowLinkStealing()){
- clientIdSet.remove(clientId);
- if (oldContext.getConnection() != null) {
- Connection connection = oldContext.getConnection();
- LOG.warn("Stealing link for clientId {} From Connection {}", clientId, oldContext.getConnection());
- if (connection instanceof TransportConnection){
+ if (context.isAllowLinkStealing()) {
+ clientIdSet.put(clientId, context);
+ if (oldContext.getConnection() != null) {
+ Connection connection = oldContext.getConnection();
+ LOG.warn("Stealing link for clientId {} From Connection {}", clientId, oldContext.getConnection());
+ if (connection instanceof TransportConnection) {
TransportConnection transportConnection = (TransportConnection) connection;
- transportConnection.stopAsync();
- }else{
- connection.stop();
- }
- }else{
- LOG.error("Not Connection for {}", oldContext);
- }
- }else{
+ transportConnection.stopAsync();
+ } else {
+ connection.stop();
+ }
+ } else {
+ LOG.error("No Connection found for {}", oldContext);
+ }
+ } else {
throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from "
- + oldContext.getConnection().getRemoteAddress());
+ + oldContext.getConnection().getRemoteAddress());
}
} else {
clientIdSet.put(clientId, context);
http://git-wip-us.apache.org/repos/asf/activemq/blob/62c20ebd/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index 1586ff4..32f8167 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -34,6 +34,7 @@ import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import javax.jms.BytesMessage;
@@ -1227,6 +1228,46 @@ public class MQTTTest extends MQTTTestSupport {
connection2.disconnect();
}
+ @Test(timeout = 60 * 1000)
+ public void testRepeatedLinkStealing() throws Exception {
+ final String clientId = "duplicateClient";
+ final AtomicReference<BlockingConnection> oldConnection = new AtomicReference<BlockingConnection>();
+ final String TOPICA = "TopicA";
+
+ for (int i = 1; i <= 10; ++i) {
+
+ LOG.info("Creating MQTT Connection {}", i);
+
+ MQTT mqtt = createMQTTConnection(clientId, false);
+ mqtt.setKeepAlive((short) 2);
+ final BlockingConnection connection = mqtt.blockingConnection();
+ connection.connect();
+ connection.publish(TOPICA, TOPICA.getBytes(), QoS.EXACTLY_ONCE, true);
+
+ assertTrue("Client connect failed for attempt: " + i, Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return connection.isConnected();
+ }
+ }));
+
+ if (oldConnection.get() != null) {
+
+ assertTrue("Old client still connected", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return !oldConnection.get().isConnected();
+ }
+ }));
+ }
+
+ oldConnection.set(connection);
+ }
+
+ oldConnection.get().publish(TOPICA, TOPICA.getBytes(), QoS.EXACTLY_ONCE, true);
+ oldConnection.get().disconnect();
+ }
+
@Test(timeout = 30 * 10000)
public void testJmsMapping() throws Exception {
doTestJmsMapping("test.foo");