You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2016/07/05 18:19:42 UTC
[1/2] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-6346
Repository: activemq
Updated Branches:
refs/heads/activemq-5.13.x 7ddfa97d0 -> fda982dcc
https://issues.apache.org/jira/browse/AMQ-6346
Prevent concurrent access to the MQTT protocol handlers which can lead
to a tansport level deadlock
(cherry picked from commit 96494f74c7142c3396f17696f345c2355c16a61c)
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/1dfd0eeb
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/1dfd0eeb
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/1dfd0eeb
Branch: refs/heads/activemq-5.13.x
Commit: 1dfd0eeb60921243bec8245edb16c07ca98c1857
Parents: 7ddfa97
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Tue Jul 5 17:47:49 2016 +0000
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Tue Jul 5 18:18:14 2016 +0000
----------------------------------------------------------------------
.../activemq/transport/ws/AbstractMQTTSocket.java | 12 +++++++++++-
.../activemq/transport/ws/jetty9/MQTTSocket.java | 17 ++++++++++++++---
2 files changed, 25 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/1dfd0eeb/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractMQTTSocket.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractMQTTSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractMQTTSocket.java
index 65d12c2..c293811 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractMQTTSocket.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractMQTTSocket.java
@@ -19,6 +19,7 @@ package org.apache.activemq.transport.ws;
import java.io.IOException;
import java.security.cert.X509Certificate;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
@@ -34,6 +35,7 @@ import org.fusesource.mqtt.codec.MQTTFrame;
public abstract class AbstractMQTTSocket extends TransportSupport implements MQTTTransport, BrokerServiceAware {
+ protected ReentrantLock protocolLock = new ReentrantLock();
protected volatile MQTTProtocolConverter protocolConverter = null;
protected MQTTWireFormat wireFormat = new MQTTWireFormat();
protected final MQTTInactivityMonitor mqttInactivityMonitor = new MQTTInactivityMonitor(this, wireFormat);
@@ -50,16 +52,24 @@ public abstract class AbstractMQTTSocket extends TransportSupport implements MQT
@Override
public void oneway(Object command) throws IOException {
+ protocolLock.lock();
try {
getProtocolConverter().onActiveMQCommand((Command)command);
} catch (Exception e) {
onException(IOExceptionSupport.create(e));
+ } finally {
+ protocolLock.unlock();
}
}
@Override
public void sendToActiveMQ(Command command) {
- doConsume(command);
+ protocolLock.lock();
+ try {
+ doConsume(command);
+ } finally {
+ protocolLock.unlock();
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq/blob/1dfd0eeb/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java
index b2dd9be..f19e4d2 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java
@@ -18,6 +18,7 @@ package org.apache.activemq.transport.ws.jetty9;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
import org.apache.activemq.transport.ws.AbstractMQTTSocket;
import org.apache.activemq.util.ByteSequence;
@@ -33,6 +34,7 @@ public class MQTTSocket extends AbstractMQTTSocket implements WebSocketListener
private static final Logger LOG = LoggerFactory.getLogger(MQTTSocket.class);
+ private final int ORDERLY_CLOSE_TIMEOUT = 10;
private Session session;
public MQTTSocket(String remoteAddress) {
@@ -65,22 +67,31 @@ public class MQTTSocket extends AbstractMQTTSocket implements WebSocketListener
}
}
- receiveCounter += length;
-
+ protocolLock.lock();
try {
+ receiveCounter += length;
MQTTFrame frame = (MQTTFrame)wireFormat.unmarshal(new ByteSequence(bytes, offset, length));
getProtocolConverter().onMQTTCommand(frame);
} catch (Exception e) {
onException(IOExceptionSupport.create(e));
+ } finally {
+ protocolLock.unlock();
}
}
@Override
public void onWebSocketClose(int arg0, String arg1) {
try {
- getProtocolConverter().onMQTTCommand(new DISCONNECT().encode());
+ if (protocolLock.tryLock() || protocolLock.tryLock(ORDERLY_CLOSE_TIMEOUT, TimeUnit.SECONDS)) {
+ LOG.debug("MQTT WebSocket closed: code[{}] message[{}]", arg0, arg1);
+ getProtocolConverter().onMQTTCommand(new DISCONNECT().encode());
+ }
} catch (Exception e) {
LOG.warn("Failed to close WebSocket", e);
+ } finally {
+ if (protocolLock.isHeldByCurrentThread()) {
+ protocolLock.unlock();
+ }
}
}
[2/2] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-6343
Posted by cs...@apache.org.
https://issues.apache.org/jira/browse/AMQ-6343
On MQTT Websocket close, a LWT message will be properly sent if
configured and a disconnect packet was not received
(cherry picked from commit bd442a3388b0f127c8f8b9fad5e4888b77bb42c3)
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/fda982dc
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/fda982dc
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/fda982dc
Branch: refs/heads/activemq-5.13.x
Commit: fda982dccb12098a84e3de9cd951108be1a8167f
Parents: 1dfd0ee
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Tue Jul 5 17:47:49 2016 +0000
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Tue Jul 5 18:19:24 2016 +0000
----------------------------------------------------------------------
.../transport/ws/jetty9/MQTTSocket.java | 9 +
.../activemq/transport/ws/MQTTWSConnection.java | 8 +-
.../transport/ws/MQTTWSTransportWillTest.java | 251 +++++++++++++++++++
.../transport/ws/WSTransportTestSupport.java | 5 +-
.../transport/mqtt/MQTTProtocolConverter.java | 3 +-
5 files changed, 269 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/fda982dc/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java
index f19e4d2..6198bf4 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java
@@ -19,6 +19,7 @@ package org.apache.activemq.transport.ws.jetty9;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.transport.ws.AbstractMQTTSocket;
import org.apache.activemq.util.ByteSequence;
@@ -36,6 +37,7 @@ public class MQTTSocket extends AbstractMQTTSocket implements WebSocketListener
private final int ORDERLY_CLOSE_TIMEOUT = 10;
private Session session;
+ final AtomicBoolean receivedDisconnect = new AtomicBoolean();
public MQTTSocket(String remoteAddress) {
super(remoteAddress);
@@ -71,6 +73,9 @@ public class MQTTSocket extends AbstractMQTTSocket implements WebSocketListener
try {
receiveCounter += length;
MQTTFrame frame = (MQTTFrame)wireFormat.unmarshal(new ByteSequence(bytes, offset, length));
+ if (frame.messageType() == DISCONNECT.TYPE) {
+ receivedDisconnect.set(true);
+ }
getProtocolConverter().onMQTTCommand(frame);
} catch (Exception e) {
onException(IOExceptionSupport.create(e));
@@ -84,6 +89,10 @@ public class MQTTSocket extends AbstractMQTTSocket implements WebSocketListener
try {
if (protocolLock.tryLock() || protocolLock.tryLock(ORDERLY_CLOSE_TIMEOUT, TimeUnit.SECONDS)) {
LOG.debug("MQTT WebSocket closed: code[{}] message[{}]", arg0, arg1);
+ //Check if we received a disconnect packet before closing
+ if (!receivedDisconnect.get()) {
+ getProtocolConverter().onTransportError();
+ }
getProtocolConverter().onMQTTCommand(new DISCONNECT().encode());
}
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/activemq/blob/fda982dc/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSConnection.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSConnection.java b/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSConnection.java
index fdbf867..2485f04 100644
--- a/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSConnection.java
+++ b/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSConnection.java
@@ -85,14 +85,16 @@ public class MQTTWSConnection extends WebSocketAdapter implements WebSocketListe
}
public void connect(String clientId) throws Exception {
- checkConnected();
-
CONNECT command = new CONNECT();
-
command.clientId(new UTF8Buffer(clientId));
command.cleanSession(false);
command.version(3);
command.keepAlive((short) 0);
+ connect(command);
+ }
+
+ public void connect(CONNECT command) throws Exception {
+ checkConnected();
ByteSequence payload = wireFormat.marshal(command.encode());
connection.getRemote().sendBytes(ByteBuffer.wrap(payload.data));
http://git-wip-us.apache.org/repos/asf/activemq/blob/fda982dc/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSTransportWillTest.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSTransportWillTest.java b/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSTransportWillTest.java
new file mode 100644
index 0000000..6d05ab8
--- /dev/null
+++ b/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSTransportWillTest.java
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.ws;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
+import org.eclipse.jetty.websocket.client.WebSocketClient;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.UTF8Buffer;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.client.Topic;
+import org.fusesource.mqtt.codec.CONNECT;
+import org.fusesource.mqtt.codec.PUBACK;
+import org.fusesource.mqtt.codec.PUBLISH;
+import org.fusesource.mqtt.codec.SUBSCRIBE;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * This shows that last will and testament messages work with MQTT over WS.
+ * This test is modeled after org.apache.activemq.transport.mqtt.MQTTWillTest
+ */
+@RunWith(Parameterized.class)
+public class MQTTWSTransportWillTest extends WSTransportTestSupport {
+
+ protected WebSocketClient wsClient;
+ protected MQTTWSConnection wsMQTTConnection1;
+ protected MQTTWSConnection wsMQTTConnection2;
+ protected ClientUpgradeRequest request;
+
+ private String willTopic = "willTopic";
+ private String payload = "last will";
+ private boolean closeWithDisconnect;
+
+ //Test both with a proper disconnect and without
+ @Parameters(name="closeWithDisconnect={0}")
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ {true},
+ {false}
+ });
+ }
+
+ public MQTTWSTransportWillTest(boolean closeWithDisconnect) {
+ this.closeWithDisconnect = closeWithDisconnect;
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ //turn off advisory support
+ broker = createBroker(true, false);
+
+ wsClient = new WebSocketClient(new SslContextFactory(true));
+ wsClient.start();
+
+ request = new ClientUpgradeRequest();
+ request.setSubProtocols("mqttv3.1");
+
+ wsMQTTConnection1 = new MQTTWSConnection();
+ wsMQTTConnection2 = new MQTTWSConnection();
+
+ wsClient.connect(wsMQTTConnection1, wsConnectUri, request);
+ if (!wsMQTTConnection1.awaitConnection(30, TimeUnit.SECONDS)) {
+ throw new IOException("Could not connect to MQTT WS endpoint");
+ }
+
+ wsClient.connect(wsMQTTConnection2, wsConnectUri, request);
+ if (!wsMQTTConnection2.awaitConnection(30, TimeUnit.SECONDS)) {
+ throw new IOException("Could not connect to MQTT WS endpoint");
+ }
+ }
+
+ @Override
+ @After
+ public void tearDown() throws Exception {
+ if (wsMQTTConnection1 != null) {
+ wsMQTTConnection1.close();
+ wsMQTTConnection1 = null;
+ }
+ if (wsMQTTConnection2 != null) {
+ wsMQTTConnection2.close();
+ wsMQTTConnection2 = null;
+ }
+ wsClient.stop();
+ wsClient = null;
+ super.tearDown();
+ }
+
+ @Test(timeout = 60000)
+ public void testWill() throws Exception {
+
+ //connect with will retain false
+ CONNECT command = getWillConnectCommand(false);
+
+ //connect both connections
+ wsMQTTConnection1.connect(command);
+ wsMQTTConnection2.connect();
+
+ //Subscribe to topics
+ SUBSCRIBE subscribe = new SUBSCRIBE();
+ subscribe.topics(new Topic[] {new Topic("#", QoS.EXACTLY_ONCE) });
+ wsMQTTConnection2.sendFrame(subscribe.encode());
+ wsMQTTConnection2.receive(5, TimeUnit.SECONDS);
+
+ //Test message send/receive
+ wsMQTTConnection1.sendFrame(getTestMessage((short) 125).encode());
+ assertMessageReceived(wsMQTTConnection2);
+
+ //close the first connection without sending a proper disconnect frame first
+ //if closeWithDisconnect is false
+ if (closeWithDisconnect) {
+ wsMQTTConnection1.disconnect();
+ }
+ wsMQTTConnection1.close();
+
+ //Make sure LWT message is not received
+ if (closeWithDisconnect) {
+ assertNull(wsMQTTConnection2.receive(5, TimeUnit.SECONDS));
+ //make sure LWT is received
+ } else {
+ assertWillTopicReceived(wsMQTTConnection2);
+ }
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testRetainWillMessage() throws Exception {
+
+ //create connection with will retain true
+ CONNECT command = getWillConnectCommand(true);
+
+ wsMQTTConnection1.connect(command);
+ wsMQTTConnection2.connect();
+
+ //set to at most once to test will retain
+ SUBSCRIBE subscribe = new SUBSCRIBE();
+ subscribe.topics(new Topic[] {new Topic("#", QoS.AT_MOST_ONCE) });
+ wsMQTTConnection2.sendFrame(subscribe.encode());
+ wsMQTTConnection2.receive(5, TimeUnit.SECONDS);
+
+ //Test message send/receive
+ PUBLISH pub = getTestMessage((short) 127);
+ wsMQTTConnection1.sendFrame(pub.encode());
+ assertMessageReceived(wsMQTTConnection2);
+ PUBACK ack = new PUBACK();
+ ack.messageId(pub.messageId());
+ wsMQTTConnection2.sendFrame(ack.encode());
+
+ //Properly close connection 2 and improperly close connection 1 for LWT test
+ wsMQTTConnection2.disconnect();
+ wsMQTTConnection2.close();
+ Thread.sleep(1000);
+ //close the first connection without sending a proper disconnect frame first
+ //if closeWithoutDisconnect is false
+ if (closeWithDisconnect) {
+ wsMQTTConnection1.disconnect();
+ }
+ wsMQTTConnection1.close();
+ Thread.sleep(1000);
+
+ //Do the reconnect of the websocket after close
+ wsMQTTConnection2 = new MQTTWSConnection();
+ wsClient.connect(wsMQTTConnection2, wsConnectUri, request);
+ if (!wsMQTTConnection2.awaitConnection(30, TimeUnit.SECONDS)) {
+ throw new IOException("Could not connect to MQTT WS endpoint");
+ }
+
+
+ //Make sure the will message is received on reconnect
+ wsMQTTConnection2.connect();
+ wsMQTTConnection2.sendFrame(subscribe.encode());
+ wsMQTTConnection2.receive(5, TimeUnit.SECONDS);
+
+ //Make sure LWT message not received
+ if (closeWithDisconnect) {
+ assertNull(wsMQTTConnection2.receive(5, TimeUnit.SECONDS));
+ //make sure LWT is received
+ } else {
+ assertWillTopicReceived(wsMQTTConnection2);
+ }
+ }
+
+ private PUBLISH getTestMessage(short id) {
+ PUBLISH publish = new PUBLISH();
+ publish.dup(false);
+ publish.messageId(id);
+ publish.qos(QoS.AT_LEAST_ONCE);
+ publish.payload(new Buffer("hello world".getBytes()));
+ publish.topicName(new UTF8Buffer("test"));
+
+ return publish;
+ }
+
+ private CONNECT getWillConnectCommand(boolean willRetain) {
+ CONNECT command = new CONNECT();
+ command.clientId(new UTF8Buffer("clientId"));
+ command.cleanSession(false);
+ command.version(3);
+ command.keepAlive((short) 0);
+ command.willMessage(new UTF8Buffer(payload));
+ command.willQos(QoS.AT_LEAST_ONCE);
+ command.willTopic(new UTF8Buffer(willTopic));
+ command.willRetain(willRetain);
+
+ return command;
+ }
+
+ private void assertMessageReceived(MQTTWSConnection wsMQTTConnection2) throws Exception {
+ PUBLISH msg = new PUBLISH();
+ msg.decode(wsMQTTConnection2.receive(5, TimeUnit.SECONDS));
+ assertNotNull(msg);
+ assertEquals("hello world", msg.payload().ascii().toString());
+ assertEquals("test", msg.topicName().toString());
+ }
+
+ private void assertWillTopicReceived(MQTTWSConnection wsMQTTConnection2) throws Exception {
+ PUBLISH willMsg = new PUBLISH();
+ willMsg.decode(wsMQTTConnection2.receive(5, TimeUnit.SECONDS));
+ assertNotNull(willMsg);
+ assertEquals(payload, willMsg.payload().ascii().toString());
+ assertEquals(willTopic, willMsg.topicName().toString());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/fda982dc/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTestSupport.java b/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTestSupport.java
index edf7b6c..3a38cc9 100644
--- a/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTestSupport.java
+++ b/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTestSupport.java
@@ -55,7 +55,7 @@ public class WSTransportTestSupport {
@Before
public void setUp() throws Exception {
LOG.info("========== Starting test: {} ==========", name.getMethodName());
- broker = createBroker(true);
+ broker = createBroker(true, true);
}
@After
@@ -91,7 +91,7 @@ public class WSTransportTestSupport {
}
- protected BrokerService createBroker(boolean deleteMessages) throws Exception {
+ protected BrokerService createBroker(boolean deleteMessages, boolean advisorySupport) throws Exception {
BrokerService broker = new BrokerService();
@@ -105,6 +105,7 @@ public class WSTransportTestSupport {
wsConnectUri = broker.addConnector(getWSConnectorURI()).getPublishableConnectURI();
+ broker.setAdvisorySupport(advisorySupport);
broker.setUseJmx(true);
broker.getManagementContext().setCreateConnector(false);
broker.setPersistent(isPersistent());
http://git-wip-us.apache.org/repos/asf/activemq/blob/fda982dc/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index 2b384e1..01a306e 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -339,8 +339,7 @@ public class MQTTProtocolConverter {
}
void onMQTTDisconnect() throws MQTTProtocolException {
- if (connected.get()) {
- connected.set(false);
+ if (connected.compareAndSet(true, false)) {
sendToActiveMQ(connectionInfo.createRemoveCommand(), null);
sendToActiveMQ(new ShutdownInfo(), null);
}