You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2016/07/05 18:01:58 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-6346
Repository: activemq
Updated Branches:
refs/heads/master 30ff378a3 -> 96494f74c
https://issues.apache.org/jira/browse/AMQ-6346
Prevent concurrent access to the MQTT protocol handlers which can lead
to a tansport level deadlock
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/96494f74
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/96494f74
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/96494f74
Branch: refs/heads/master
Commit: 96494f74c7142c3396f17696f345c2355c16a61c
Parents: 30ff378
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Tue Jul 5 17:47:49 2016 +0000
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Tue Jul 5 17:50:03 2016 +0000
----------------------------------------------------------------------
.../activemq/transport/ws/AbstractMQTTSocket.java | 12 +++++++++++-
.../activemq/transport/ws/jetty9/MQTTSocket.java | 17 ++++++++++++++---
2 files changed, 25 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/96494f74/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractMQTTSocket.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractMQTTSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractMQTTSocket.java
index fe0bd32..48282d9 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractMQTTSocket.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractMQTTSocket.java
@@ -20,6 +20,7 @@ import java.io.IOException;
import java.security.cert.X509Certificate;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
@@ -36,6 +37,7 @@ import org.fusesource.mqtt.codec.MQTTFrame;
public abstract class AbstractMQTTSocket extends TransportSupport implements MQTTTransport, BrokerServiceAware {
+ protected ReentrantLock protocolLock = new ReentrantLock();
protected volatile MQTTProtocolConverter protocolConverter = null;
protected MQTTWireFormat wireFormat = new MQTTWireFormat();
protected final MQTTInactivityMonitor mqttInactivityMonitor = new MQTTInactivityMonitor(this, wireFormat);
@@ -53,16 +55,24 @@ public abstract class AbstractMQTTSocket extends TransportSupport implements MQT
@Override
public void oneway(Object command) throws IOException {
+ protocolLock.lock();
try {
getProtocolConverter().onActiveMQCommand((Command)command);
} catch (Exception e) {
onException(IOExceptionSupport.create(e));
+ } finally {
+ protocolLock.unlock();
}
}
@Override
public void sendToActiveMQ(Command command) {
- doConsume(command);
+ protocolLock.lock();
+ try {
+ doConsume(command);
+ } finally {
+ protocolLock.unlock();
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq/blob/96494f74/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java
index 8e0c416..53bad07 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java
@@ -18,6 +18,7 @@ package org.apache.activemq.transport.ws.jetty9;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
import org.apache.activemq.transport.ws.AbstractMQTTSocket;
import org.apache.activemq.util.ByteSequence;
@@ -33,6 +34,7 @@ public class MQTTSocket extends AbstractMQTTSocket implements WebSocketListener
private static final Logger LOG = LoggerFactory.getLogger(MQTTSocket.class);
+ private final int ORDERLY_CLOSE_TIMEOUT = 10;
private Session session;
public MQTTSocket(String remoteAddress) {
@@ -65,22 +67,31 @@ public class MQTTSocket extends AbstractMQTTSocket implements WebSocketListener
}
}
- receiveCounter += length;
-
+ protocolLock.lock();
try {
+ receiveCounter += length;
MQTTFrame frame = (MQTTFrame)wireFormat.unmarshal(new ByteSequence(bytes, offset, length));
getProtocolConverter().onMQTTCommand(frame);
} catch (Exception e) {
onException(IOExceptionSupport.create(e));
+ } finally {
+ protocolLock.unlock();
}
}
@Override
public void onWebSocketClose(int arg0, String arg1) {
try {
- getProtocolConverter().onMQTTCommand(new DISCONNECT().encode());
+ if (protocolLock.tryLock() || protocolLock.tryLock(ORDERLY_CLOSE_TIMEOUT, TimeUnit.SECONDS)) {
+ LOG.debug("MQTT WebSocket closed: code[{}] message[{}]", arg0, arg1);
+ getProtocolConverter().onMQTTCommand(new DISCONNECT().encode());
+ }
} catch (Exception e) {
LOG.debug("Failed to close MQTT WebSocket cleanly", e);
+ } finally {
+ if (protocolLock.isHeldByCurrentThread()) {
+ protocolLock.unlock();
+ }
}
}