You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2016/07/05 18:01:58 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-6346

Repository: activemq
Updated Branches:
  refs/heads/master 30ff378a3 -> 96494f74c


https://issues.apache.org/jira/browse/AMQ-6346

Prevent concurrent access to the MQTT protocol handlers which can lead
to a tansport level deadlock


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/96494f74
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/96494f74
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/96494f74

Branch: refs/heads/master
Commit: 96494f74c7142c3396f17696f345c2355c16a61c
Parents: 30ff378
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Tue Jul 5 17:47:49 2016 +0000
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Tue Jul 5 17:50:03 2016 +0000

----------------------------------------------------------------------
 .../activemq/transport/ws/AbstractMQTTSocket.java  | 12 +++++++++++-
 .../activemq/transport/ws/jetty9/MQTTSocket.java   | 17 ++++++++++++++---
 2 files changed, 25 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/96494f74/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractMQTTSocket.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractMQTTSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractMQTTSocket.java
index fe0bd32..48282d9 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractMQTTSocket.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractMQTTSocket.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.security.cert.X509Certificate;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
@@ -36,6 +37,7 @@ import org.fusesource.mqtt.codec.MQTTFrame;
 
 public abstract class AbstractMQTTSocket extends TransportSupport implements MQTTTransport, BrokerServiceAware {
 
+    protected ReentrantLock protocolLock = new ReentrantLock();
     protected volatile MQTTProtocolConverter protocolConverter = null;
     protected MQTTWireFormat wireFormat = new MQTTWireFormat();
     protected final MQTTInactivityMonitor mqttInactivityMonitor = new MQTTInactivityMonitor(this, wireFormat);
@@ -53,16 +55,24 @@ public abstract class AbstractMQTTSocket extends TransportSupport implements MQT
 
     @Override
     public void oneway(Object command) throws IOException {
+        protocolLock.lock();
         try {
             getProtocolConverter().onActiveMQCommand((Command)command);
         } catch (Exception e) {
             onException(IOExceptionSupport.create(e));
+        } finally {
+            protocolLock.unlock();
         }
     }
 
     @Override
     public void sendToActiveMQ(Command command) {
-        doConsume(command);
+        protocolLock.lock();
+        try {
+            doConsume(command);
+        } finally {
+            protocolLock.unlock();
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/96494f74/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java
index 8e0c416..53bad07 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java
@@ -18,6 +18,7 @@ package org.apache.activemq.transport.ws.jetty9;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.transport.ws.AbstractMQTTSocket;
 import org.apache.activemq.util.ByteSequence;
@@ -33,6 +34,7 @@ public class MQTTSocket extends AbstractMQTTSocket implements WebSocketListener
 
     private static final Logger LOG = LoggerFactory.getLogger(MQTTSocket.class);
 
+    private final int ORDERLY_CLOSE_TIMEOUT = 10;
     private Session session;
 
     public MQTTSocket(String remoteAddress) {
@@ -65,22 +67,31 @@ public class MQTTSocket extends AbstractMQTTSocket implements WebSocketListener
             }
         }
 
-        receiveCounter += length;
-
+        protocolLock.lock();
         try {
+            receiveCounter += length;
             MQTTFrame frame = (MQTTFrame)wireFormat.unmarshal(new ByteSequence(bytes, offset, length));
             getProtocolConverter().onMQTTCommand(frame);
         } catch (Exception e) {
             onException(IOExceptionSupport.create(e));
+        } finally {
+            protocolLock.unlock();
         }
     }
 
     @Override
     public void onWebSocketClose(int arg0, String arg1) {
         try {
-            getProtocolConverter().onMQTTCommand(new DISCONNECT().encode());
+            if (protocolLock.tryLock() || protocolLock.tryLock(ORDERLY_CLOSE_TIMEOUT, TimeUnit.SECONDS)) {
+                LOG.debug("MQTT WebSocket closed: code[{}] message[{}]", arg0, arg1);
+                getProtocolConverter().onMQTTCommand(new DISCONNECT().encode());
+            }
         } catch (Exception e) {
             LOG.debug("Failed to close MQTT WebSocket cleanly", e);
+        } finally {
+            if (protocolLock.isHeldByCurrentThread()) {
+                protocolLock.unlock();
+            }
         }
     }