You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/10/19 23:41:39 UTC

svn commit: r1400304 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt: MQTTInactivityMonitor.java MQTTProtocolConverter.java MQTTWireFormat.java

Author: tabish
Date: Fri Oct 19 21:41:39 2012
New Revision: 1400304

URL: http://svn.apache.org/viewvc?rev=1400304&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-4117

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java?rev=1400304&r1=1400303&r2=1400304&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java Fri Oct 19 21:41:39 2012
@@ -25,7 +25,7 @@ import java.util.concurrent.ThreadPoolEx
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.activemq.command.KeepAliveInfo;
 import org.apache.activemq.thread.SchedulerTimerTask;
@@ -42,22 +42,19 @@ public class MQTTInactivityMonitor exten
 
     private static final Logger LOG = LoggerFactory.getLogger(MQTTInactivityMonitor.class);
 
+    private static final long DEFAULT_CHECK_TIME_MILLS = 30000;
+
     private static ThreadPoolExecutor ASYNC_TASKS;
     private static int CHECKER_COUNTER;
-    private static long DEFAULT_CHECK_TIME_MILLS = 30000;
     private static Timer READ_CHECK_TIMER;
 
     private final AtomicBoolean monitorStarted = new AtomicBoolean(false);
-
-    private final AtomicBoolean commandSent = new AtomicBoolean(false);
-    private final AtomicBoolean inSend = new AtomicBoolean(false);
     private final AtomicBoolean failed = new AtomicBoolean(false);
-
     private final AtomicBoolean commandReceived = new AtomicBoolean(true);
     private final AtomicBoolean inReceive = new AtomicBoolean(false);
     private final AtomicInteger lastReceiveCounter = new AtomicInteger(0);
 
-    private final ReentrantReadWriteLock sendLock = new ReentrantReadWriteLock();
+    private final ReentrantLock sendLock = new ReentrantLock();
     private SchedulerTimerTask readCheckerTask;
 
     private long readCheckTime = DEFAULT_CHECK_TIME_MILLS;
@@ -65,7 +62,6 @@ public class MQTTInactivityMonitor exten
     private boolean keepAliveResponseRequired;
     private MQTTProtocolConverter protocolConverter;
 
-
     private final Runnable readChecker = new Runnable() {
         long lastRunTime;
 
@@ -96,7 +92,6 @@ public class MQTTInactivityMonitor exten
         return elapsed > (readCheckTime * 9 / 10);
     }
 
-
     public MQTTInactivityMonitor(Transport next, WireFormat wireFormat) {
         super(next);
     }
@@ -111,7 +106,6 @@ public class MQTTInactivityMonitor exten
         next.stop();
     }
 
-
     final void readCheck() {
         int currentCounter = next.getReceiveCounter();
         int previousCounter = lastReceiveCounter.getAndSet(currentCounter);
@@ -132,8 +126,6 @@ public class MQTTInactivityMonitor exten
                     }
                     onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: " + next.getRemoteAddress()));
                 }
-
-                ;
             });
         } else {
             if (LOG.isTraceEnabled()) {
@@ -143,7 +135,6 @@ public class MQTTInactivityMonitor exten
         commandReceived.set(false);
     }
 
-
     public void onCommand(Object command) {
         commandReceived.set(true);
         inReceive.set(true);
@@ -151,14 +142,14 @@ public class MQTTInactivityMonitor exten
             if (command.getClass() == KeepAliveInfo.class) {
                 KeepAliveInfo info = (KeepAliveInfo) command;
                 if (info.isResponseRequired()) {
-                    sendLock.readLock().lock();
+                    sendLock.lock();
                     try {
                         info.setResponseRequired(false);
                         oneway(info);
                     } catch (IOException e) {
                         onException(e);
                     } finally {
-                        sendLock.readLock().unlock();
+                        sendLock.unlock();
                     }
                 }
             } else {
@@ -171,17 +162,12 @@ public class MQTTInactivityMonitor exten
 
     public void oneway(Object o) throws IOException {
         // To prevent the inactivity monitor from sending a message while we
-        // are performing a send we take a read lock.  The inactivity monitor
-        // sends its Heart-beat commands under a write lock.  This means that
-        // the MutexTransport is still responsible for synchronizing sends
-        this.sendLock.readLock().lock();
-        inSend.set(true);
+        // are performing a send we take the lock.
+        this.sendLock.lock();
         try {
             doOnewaySend(o);
         } finally {
-            commandSent.set(true);
-            inSend.set(false);
-            this.sendLock.readLock().unlock();
+            this.sendLock.unlock();
         }
     }
 
@@ -200,7 +186,6 @@ public class MQTTInactivityMonitor exten
         }
     }
 
-
     public long getReadCheckTime() {
         return readCheckTime;
     }
@@ -209,7 +194,6 @@ public class MQTTInactivityMonitor exten
         this.readCheckTime = readCheckTime;
     }
 
-
     public long getInitialDelayTime() {
         return initialDelayTime;
     }
@@ -239,16 +223,20 @@ public class MQTTInactivityMonitor exten
     }
 
     synchronized void startMonitorThread() {
-        if (monitorStarted.get()) {
+
+        // Not yet configured if this isn't set yet.
+        if (protocolConverter == null) {
             return;
         }
 
+        if (monitorStarted.get()) {
+            return;
+        }
 
         if (readCheckTime > 0) {
             readCheckerTask = new SchedulerTimerTask(readChecker);
         }
 
-
         if (readCheckTime > 0) {
             monitorStarted.set(true);
             synchronized (AbstractInactivityMonitor.class) {
@@ -264,7 +252,6 @@ public class MQTTInactivityMonitor exten
         }
     }
 
-
     synchronized void stopMonitorThread() {
         if (monitorStarted.compareAndSet(true, false)) {
             if (readCheckerTask != null) {
@@ -293,9 +280,8 @@ public class MQTTInactivityMonitor exten
     };
 
     private ThreadPoolExecutor createExecutor() {
-        ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory);
+        ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory);
         exec.allowCoreThreadTimeOut(true);
         return exec;
     }
 }
-

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java?rev=1400304&r1=1400303&r2=1400304&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java Fri Oct 19 21:41:39 2012
@@ -26,8 +26,30 @@ import java.util.zip.Inflater;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
+
 import org.apache.activemq.broker.BrokerContext;
-import org.apache.activemq.command.*;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConnectionError;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.ExceptionResponse;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.SessionId;
+import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.util.ByteArrayOutputStream;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.IOExceptionSupport;
@@ -38,7 +60,21 @@ import org.fusesource.hawtbuf.Buffer;
 import org.fusesource.hawtbuf.UTF8Buffer;
 import org.fusesource.mqtt.client.QoS;
 import org.fusesource.mqtt.client.Topic;
-import org.fusesource.mqtt.codec.*;
+import org.fusesource.mqtt.codec.CONNACK;
+import org.fusesource.mqtt.codec.CONNECT;
+import org.fusesource.mqtt.codec.DISCONNECT;
+import org.fusesource.mqtt.codec.MQTTFrame;
+import org.fusesource.mqtt.codec.PINGREQ;
+import org.fusesource.mqtt.codec.PINGRESP;
+import org.fusesource.mqtt.codec.PUBACK;
+import org.fusesource.mqtt.codec.PUBCOMP;
+import org.fusesource.mqtt.codec.PUBLISH;
+import org.fusesource.mqtt.codec.PUBREC;
+import org.fusesource.mqtt.codec.PUBREL;
+import org.fusesource.mqtt.codec.SUBACK;
+import org.fusesource.mqtt.codec.SUBSCRIBE;
+import org.fusesource.mqtt.codec.UNSUBACK;
+import org.fusesource.mqtt.codec.UNSUBSCRIBE;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,7 +85,6 @@ class MQTTProtocolConverter {
     private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
     private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode();
 
-
     private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
     private final SessionId sessionId = new SessionId(connectionId, -1);
     private final ProducerId producerId = new ProducerId(sessionId, 1);
@@ -83,7 +118,6 @@ class MQTTProtocolConverter {
         }
     }
 
-
     void sendToActiveMQ(Command command, ResponseHandler handler) {
         command.setCommandId(generateCommandId());
         if (handler != null) {
@@ -101,13 +135,11 @@ class MQTTProtocolConverter {
         }
     }
 
-
     /**
      * Convert a MQTT command
      */
     public void onMQTTCommand(MQTTFrame frame) throws IOException, JMSException {
 
-
         switch (frame.messageType()) {
             case PINGREQ.TYPE: {
                 mqttTransport.sendToMQTT(PING_RESP_FRAME);
@@ -152,13 +184,12 @@ class MQTTProtocolConverter {
                 onMQTTPubComp(new PUBCOMP().decode(frame));
                 break;
             }
-            default:
+            default: {
                 handleException(new MQTTProtocolException("Unknown MQTTFrame type: " + frame.messageType(), true), frame);
+            }
         }
-
     }
 
-
     void onMQTTConnect(final CONNECT connect) throws MQTTProtocolException {
 
         if (connected.get()) {
@@ -182,7 +213,6 @@ class MQTTProtocolConverter {
 
         configureInactivityMonitor(connect.keepAlive());
 
-
         connectionInfo.setConnectionId(connectionId);
         if (clientId != null && !clientId.isEmpty()) {
             connectionInfo.setClientId(clientId);
@@ -232,14 +262,12 @@ class MQTTProtocolConverter {
 
                     }
                 });
-
             }
         });
     }
 
     void onSubscribe(SUBSCRIBE command) throws MQTTProtocolException {
         checkConnected();
-        SUBACK result = new SUBACK();
         Topic[] topics = command.topics();
         if (topics != null) {
             byte[] qos = new byte[topics.length];
@@ -261,9 +289,6 @@ class MQTTProtocolConverter {
 
     QoS onSubscribe(SUBSCRIBE command, Topic topic) throws MQTTProtocolException {
         ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topic.name().toString()));
-        if (destination == null) {
-            throw new MQTTProtocolException("Invalid Destination.");
-        }
 
         ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
         ConsumerInfo consumerInfo = new ConsumerInfo(id);
@@ -277,7 +302,6 @@ class MQTTProtocolConverter {
 
         MQTTSubscription mqttSubscription = new MQTTSubscription(this, topic.qos(), consumerInfo);
 
-
         subscriptionsByConsumerId.put(id, mqttSubscription);
         mqttSubscriptionByTopic.put(topic.name(), mqttSubscription);
 
@@ -295,7 +319,6 @@ class MQTTProtocolConverter {
         UNSUBACK ack = new UNSUBACK();
         ack.messageId(command.messageId());
         sendToMQTT(ack.encode());
-
     }
 
     void onUnSubscribe(UTF8Buffer topicName) {
@@ -310,12 +333,9 @@ class MQTTProtocolConverter {
         }
     }
 
-
     /**
      * Dispatch a ActiveMQ command
      */
-
-
     public void onActiveMQCommand(Command command) throws Exception {
         if (command.isResponse()) {
             Response response = (Response) command;
@@ -406,7 +426,6 @@ class MQTTProtocolConverter {
         }
     }
 
-
     ActiveMQMessage convertMessage(PUBLISH command) throws JMSException {
         ActiveMQBytesMessage msg = new ActiveMQBytesMessage();
 
@@ -456,43 +475,37 @@ class MQTTProtocolConverter {
         }
         result.topicName(topicName);
 
-
         if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) {
-                ActiveMQTextMessage msg = (ActiveMQTextMessage) message.copy();
-                msg.setReadOnlyBody(true);
-                String messageText = msg.getText();
-                if (messageText != null) {
-                    result.payload(new Buffer(messageText.getBytes("UTF-8")));
-                }
-
-
+            ActiveMQTextMessage msg = (ActiveMQTextMessage) message.copy();
+            msg.setReadOnlyBody(true);
+            String messageText = msg.getText();
+            if (messageText != null) {
+                result.payload(new Buffer(messageText.getBytes("UTF-8")));
+            }
         } else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) {
-
             ActiveMQBytesMessage msg = (ActiveMQBytesMessage) message.copy();
             msg.setReadOnlyBody(true);
             byte[] data = new byte[(int) msg.getBodyLength()];
             msg.readBytes(data);
             result.payload(new Buffer(data));
-        } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE){
+        } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) {
             ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy();
             msg.setReadOnlyBody(true);
-            Map map = msg.getContentMap();
-            if (map != null){
+            Map<String, Object> map = msg.getContentMap();
+            if (map != null) {
                 result.payload(new Buffer(map.toString().getBytes("UTF-8")));
             }
-        }
-
-        else {
+        } else {
             ByteSequence byteSequence = message.getContent();
             if (byteSequence != null && byteSequence.getLength() > 0) {
-                if (message.isCompressed()){
+                if (message.isCompressed()) {
                     Inflater inflater = new Inflater();
-                    inflater.setInput(byteSequence.data,byteSequence.offset,byteSequence.length);
-                    byte[]  data = new byte[4096];
+                    inflater.setInput(byteSequence.data, byteSequence.offset, byteSequence.length);
+                    byte[] data = new byte[4096];
                     int read;
                     ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
-                    while((read = inflater.inflate(data)) != 0){
-                       bytesOut.write(data,0,read);
+                    while ((read = inflater.inflate(data)) != 0) {
+                        bytesOut.write(data, 0, read);
                     }
                     byteSequence = bytesOut.toByteSequence();
                 }
@@ -502,7 +515,6 @@ class MQTTProtocolConverter {
         return result;
     }
 
-
     public MQTTTransport getMQTTTransport() {
         return mqttTransport;
     }
@@ -522,22 +534,18 @@ class MQTTProtocolConverter {
                 } catch (Exception e) {
                     LOG.warn("Failed to publish Will Message " + connect.willMessage());
                 }
-
             }
         }
     }
 
-
     void configureInactivityMonitor(short heartBeat) {
         try {
-
             int heartBeatMS = heartBeat * 1000;
             MQTTInactivityMonitor monitor = getMQTTTransport().getInactivityMonitor();
             monitor.setProtocolConverter(this);
             monitor.setReadCheckTime(heartBeatMS);
             monitor.setInitialDelayTime(heartBeatMS);
             monitor.startMonitorThread();
-
         } catch (Exception ex) {
             LOG.warn("Failed to start MQTT InactivityMonitor ", ex);
         }
@@ -545,7 +553,6 @@ class MQTTProtocolConverter {
         LOG.debug(getClientId() + " MQTT Connection using heart beat of  " + heartBeat + " secs");
     }
 
-
     void handleException(Throwable exception, MQTTFrame command) {
         LOG.warn("Exception occurred processing: \n" + command + ": " + exception.toString());
         if (LOG.isDebugEnabled()) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java?rev=1400304&r1=1400303&r2=1400304&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java Fri Oct 19 21:41:39 2012
@@ -35,10 +35,8 @@ import org.fusesource.mqtt.codec.MQTTFra
  */
 public class MQTTWireFormat implements WireFormat {
 
-
     static final int MAX_MESSAGE_LENGTH = 1024 * 1024 * 256;
 
-    private boolean encodingEnabled = false;
     private int version = 1;
 
     public ByteSequence marshal(Object command) throws IOException {
@@ -119,6 +117,4 @@ public class MQTTWireFormat implements W
     public int getVersion() {
         return this.version;
     }
-
-
 }