You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2012/04/11 13:52:07 UTC

svn commit: r1324714 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/mqtt/ main/resources/META-INF/services/org/apache/activemq/transport/ test/java/org/apache/activemq/transport/mqtt/

Author: rajdavies
Date: Wed Apr 11 11:52:06 2012
New Revision: 1324714

URL: http://svn.apache.org/viewvc?rev=1324714&view=rev
Log:
added NIO support to the  MQTT protocol for https://issues.apache.org/jira/browse/AMQ-3786

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTCodec.java
      - copied, changed from r1310205, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransport.java
      - copied, changed from r1310205, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransportFactory.java
      - copied, changed from r1310205, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransportFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransport.java
      - copied, changed from r1310205, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java
      - copied, changed from r1310205, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransportFactory.java
    activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt+nio
      - copied, changed from r1310205, activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp+nio
    activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt+nio+ssl
      - copied, changed from r1310205, activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp+nio+ssl
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MTQQNioTest.java
Removed:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTConnectTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLConnectTest.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java

Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTCodec.java (from r1310205, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTCodec.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTCodec.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java&r1=1310205&r2=1324714&rev=1324714&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTCodec.java Wed Apr 11 11:52:06 2012
@@ -14,89 +14,148 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.transport.stomp;
+package org.apache.activemq.transport.mqtt;
 
-import org.apache.activemq.transport.tcp.TcpTransport;
-import org.apache.activemq.util.ByteArrayOutputStream;
-import org.apache.activemq.util.DataByteArrayInputStream;
+import java.io.IOException;
 
-import java.io.ByteArrayInputStream;
-import java.util.HashMap;
+import javax.jms.JMSException;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.fusesource.hawtbuf.DataByteArrayInputStream;
+import org.fusesource.hawtbuf.DataByteArrayOutputStream;
+import org.fusesource.mqtt.codec.*;
 
-public class StompCodec {
+public class MQTTCodec {
 
-    final static byte[] crlfcrlf = new byte[]{'\r','\n','\r','\n'};
     TcpTransport transport;
 
-    ByteArrayOutputStream currentCommand = new ByteArrayOutputStream();
-    boolean processedHeaders = false;
+    DataByteArrayOutputStream currentCommand = new DataByteArrayOutputStream();
+    boolean processedHeader = false;
     String action;
-    HashMap<String, String> headers;
+    byte header;
     int contentLength = -1;
-    int readLength = 0;
     int previousByte = -1;
+    int payLoadRead = 0;
 
-    public StompCodec(TcpTransport transport) {
+    public MQTTCodec(TcpTransport transport) {
         this.transport = transport;
     }
 
-    public void parse(ByteArrayInputStream input, int readSize) throws Exception {
-       int i = 0;
-       int b;
-       while(i++ < readSize) {
-           b = input.read();
-           // skip repeating nulls
-           if (!processedHeaders && previousByte == 0 && b == 0) {
-               continue;
-           }
-
-           if (!processedHeaders) {
-               currentCommand.write(b);
-               // end of headers section, parse action and header
-               if (b == '\n' && (previousByte == '\n' || currentCommand.endsWith(crlfcrlf))) {
-                   if (transport.getWireFormat() instanceof StompWireFormat) {
-                       DataByteArrayInputStream data = new DataByteArrayInputStream(currentCommand.toByteArray());
-                       action = ((StompWireFormat)transport.getWireFormat()).parseAction(data);
-                       headers = ((StompWireFormat)transport.getWireFormat()).parseHeaders(data);
-                       String contentLengthHeader = headers.get(Stomp.Headers.CONTENT_LENGTH);
-                       if ((action.equals(Stomp.Commands.SEND) || action.equals(Stomp.Responses.MESSAGE)) && contentLengthHeader != null) {
-                           contentLength = ((StompWireFormat)transport.getWireFormat()).parseContentLength(contentLengthHeader);
-                       } else {
-                           contentLength = -1;
-                       }
-                   }
-                   processedHeaders = true;
-                   currentCommand.reset();
-               }
-           } else {
-
-               if (contentLength == -1) {
-                   // end of command reached, unmarshal
-                   if (b == 0) {
-                       processCommand();
-                   } else {
-                       currentCommand.write(b);
-                   }
-               } else {
-                   // read desired content length
-                   if (readLength++ == contentLength) {
-                       processCommand();
-                       readLength = 0;
-                   } else {
-                       currentCommand.write(b);
-                   }
-               }
-           }
+    public void parse(DataByteArrayInputStream input, int readSize) throws Exception {
+        int i = 0;
+        byte b;
+        while (i++ < readSize) {
+            b = input.readByte();
+            // skip repeating nulls
+            if (!processedHeader && b == 0) {
+                previousByte = 0;
+                continue;
+            }
+
+            if (!processedHeader) {
+                i += processHeader(b, input);
+                if (contentLength == 0) {
+                    processCommand();
+                }
+
+            } else {
+
+                if (contentLength == -1) {
+                    // end of command reached, unmarshal
+                    if (b == 0) {
+                        processCommand();
+                    } else {
+                        currentCommand.write(b);
+                    }
+                } else {
+                    // read desired content length
+                    if (payLoadRead == contentLength) {
+                        processCommand();
+                        i += processHeader(b, input);
+                    } else {
+                        currentCommand.write(b);
+                        payLoadRead++;
+                    }
+                }
+            }
+
+            previousByte = b;
+        }
+        if (processedHeader && payLoadRead == contentLength) {
+            processCommand();
+        }
+    }
 
-           previousByte = b;
-       }
+    /**
+     * sets the content length
+     *
+     * @return number of bytes read
+     */
+    private int processHeader(byte header, DataByteArrayInputStream input) {
+        this.header = header;
+        byte digit;
+        int multiplier = 1;
+        int read = 0;
+        int length = 0;
+        do {
+            digit = input.readByte();
+            length += (digit & 0x7F) * multiplier;
+            multiplier <<= 7;
+            read++;
+        } while ((digit & 0x80) != 0);
+
+        contentLength = length;
+        processedHeader = true;
+        return read;
     }
 
-    protected void processCommand() throws Exception {
-        StompFrame frame = new StompFrame(action, headers, currentCommand.toByteArray());
+
+    private void processCommand() throws Exception {
+        MQTTFrame frame = new MQTTFrame(currentCommand.toBuffer().deepCopy()).header(header);
         transport.doConsume(frame);
-        processedHeaders = false;
+        processedHeader = false;
         currentCommand.reset();
         contentLength = -1;
+        payLoadRead = 0;
     }
+
+    public static String commandType(byte header) throws IOException, JMSException {
+
+        byte messageType = (byte) ((header & 0xF0) >>> 4);
+        switch (messageType) {
+            case PINGREQ.TYPE: {
+                return "PINGREQ";
+            }
+            case CONNECT.TYPE: {
+                return "CONNECT";
+            }
+            case DISCONNECT.TYPE: {
+                return "DISCONNECT";
+            }
+            case SUBSCRIBE.TYPE: {
+                return "SUBSCRIBE";
+            }
+            case UNSUBSCRIBE.TYPE: {
+                return "UNSUBSCRIBE";
+            }
+            case PUBLISH.TYPE: {
+                return "PUBLISH";
+            }
+            case PUBACK.TYPE: {
+                return "PUBACK";
+            }
+            case PUBREC.TYPE: {
+                return "PUBREC";
+            }
+            case PUBREL.TYPE: {
+                return "PUBREL";
+            }
+            case PUBCOMP.TYPE: {
+                return "PUBCOMP";
+            }
+            default:
+                return "UNKNOWN";
+        }
+
+    }
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java?rev=1324714&r1=1324713&r2=1324714&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java Wed Apr 11 11:52:06 2012
@@ -62,6 +62,7 @@ public class MQTTInactivityMonitor exten
     private long readCheckTime = DEFAULT_CHECK_TIME_MILLS;
     private long initialDelayTime = DEFAULT_CHECK_TIME_MILLS;
     private boolean keepAliveResponseRequired;
+    private MQTTProtocolConverter protocolConverter;
 
 
     private final Runnable readChecker = new Runnable() {
@@ -125,6 +126,9 @@ public class MQTTInactivityMonitor exten
             }
             ASYNC_TASKS.execute(new Runnable() {
                 public void run() {
+                    if (protocolConverter != null) {
+                        protocolConverter.onTransportError();
+                    }
                     onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: " + next.getRemoteAddress()));
                 }
 
@@ -225,6 +229,14 @@ public class MQTTInactivityMonitor exten
         return this.monitorStarted.get();
     }
 
+    public void setProtocolConverter(MQTTProtocolConverter protocolConverter) {
+        this.protocolConverter = protocolConverter;
+    }
+
+    public MQTTProtocolConverter getProtocolConverter() {
+        return protocolConverter;
+    }
+
     synchronized void startMonitorThread() {
         if (monitorStarted.get()) {
             return;

Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransport.java (from r1310205, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransport.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransport.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransport.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransport.java&r1=1310205&r2=1324714&rev=1324714&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransport.java Wed Apr 11 11:52:06 2012
@@ -14,34 +14,34 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.transport.stomp;
+package org.apache.activemq.transport.mqtt;
 
-import org.apache.activemq.transport.nio.NIOSSLTransport;
-import org.apache.activemq.wireformat.WireFormat;
-
-import javax.net.SocketFactory;
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.net.Socket;
 import java.net.URI;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 
-public class StompNIOSSLTransport extends NIOSSLTransport {
+import javax.net.SocketFactory;
+import org.apache.activemq.transport.nio.NIOSSLTransport;
+import org.apache.activemq.wireformat.WireFormat;
+import org.fusesource.hawtbuf.DataByteArrayInputStream;
+
+public class MQTTNIOSSLTransport extends NIOSSLTransport {
 
-    StompCodec codec;
+    MQTTCodec codec;
 
-    public StompNIOSSLTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
+    public MQTTNIOSSLTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
         super(wireFormat, socketFactory, remoteLocation, localLocation);
     }
 
-    public StompNIOSSLTransport(WireFormat wireFormat, Socket socket) throws IOException {
+    public MQTTNIOSSLTransport(WireFormat wireFormat, Socket socket) throws IOException {
         super(wireFormat, socket);
     }
 
     @Override
     protected void initializeStreams() throws IOException {
-        codec = new StompCodec(this);
+        codec = new MQTTCodec(this);
         super.initializeStreams();
         if (inputBuffer.position() != 0 && inputBuffer.hasRemaining()) {
             serviceRead();
@@ -52,8 +52,8 @@ public class StompNIOSSLTransport extend
     protected void processCommand(ByteBuffer plain) throws Exception {
         byte[] fill = new byte[plain.remaining()];
         plain.get(fill);
-        ByteArrayInputStream input = new ByteArrayInputStream(fill);
-        codec.parse(input, fill.length);
+        DataByteArrayInputStream dis = new DataByteArrayInputStream(fill);
+        codec.parse(dis, fill.length);
     }
 
-}
+}
\ No newline at end of file

Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransportFactory.java (from r1310205, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransportFactory.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransportFactory.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransportFactory.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransportFactory.java&r1=1310205&r2=1324714&rev=1324714&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransportFactory.java Wed Apr 11 11:52:06 2012
@@ -14,8 +14,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.transport.stomp;
+package org.apache.activemq.transport.mqtt;
 
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+
+import javax.net.ServerSocketFactory;
+import javax.net.SocketFactory;
+import javax.net.ssl.SSLContext;
 import org.apache.activemq.broker.SslContext;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportServer;
@@ -23,16 +32,7 @@ import org.apache.activemq.transport.tcp
 import org.apache.activemq.transport.tcp.TcpTransportServer;
 import org.apache.activemq.wireformat.WireFormat;
 
-import javax.net.ServerSocketFactory;
-import javax.net.SocketFactory;
-import javax.net.ssl.SSLContext;
-import java.io.IOException;
-import java.net.Socket;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-
-public class StompNIOSSLTransportFactory extends StompNIOTransportFactory {
+public class MQTTNIOSSLTransportFactory extends MQTTNIOTransportFactory {
 
     SSLContext context;
 
@@ -40,7 +40,7 @@ public class StompNIOSSLTransportFactory
     protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
         return new TcpTransportServer(this, location, serverSocketFactory) {
             protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
-                StompNIOSSLTransport transport = new StompNIOSSLTransport(format, socket);
+                MQTTNIOSSLTransport transport = new MQTTNIOSSLTransport(format, socket);
                 if (context != null) {
                     transport.setSslContext(context);
                 }
@@ -51,12 +51,12 @@ public class StompNIOSSLTransportFactory
 
     @Override
     protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
-        return new StompNIOSSLTransport(wf, socketFactory, location, localLocation);
+        return new MQTTNIOSSLTransport(wf, socketFactory, location, localLocation);
     }
 
     @Override
     public TransportServer doBind(URI location) throws IOException {
-       if (SslContext.getCurrentSslContext() != null) {
+        if (SslContext.getCurrentSslContext() != null) {
             try {
                 context = SslContext.getCurrentSslContext().getSSLContext();
             } catch (Exception e) {

Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransport.java (from r1310205, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransport.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransport.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java&r1=1310205&r2=1324714&rev=1324714&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransport.java Wed Apr 11 11:52:06 2012
@@ -14,9 +14,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.transport.stomp;
+package org.apache.activemq.transport.mqtt;
 
-import java.io.ByteArrayInputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
@@ -28,8 +27,6 @@ import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
 
 import javax.net.SocketFactory;
-
-import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.nio.NIOOutputStream;
 import org.apache.activemq.transport.nio.SelectorManager;
 import org.apache.activemq.transport.nio.SelectorSelection;
@@ -37,41 +34,41 @@ import org.apache.activemq.transport.tcp
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.ServiceStopper;
 import org.apache.activemq.wireformat.WireFormat;
+import org.fusesource.hawtbuf.DataByteArrayInputStream;
 
 /**
- * An implementation of the {@link Transport} interface for using Stomp over NIO
- *
- *
+ * An implementation of the {@link org.apache.activemq.transport.Transport} interface for using MQTT over NIO
  */
-public class StompNIOTransport extends TcpTransport {
+public class MQTTNIOTransport extends TcpTransport {
 
     private SocketChannel channel;
     private SelectorSelection selection;
 
     private ByteBuffer inputBuffer;
-    StompCodec codec;
+    MQTTCodec codec;
 
-    public StompNIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
+    public MQTTNIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
         super(wireFormat, socketFactory, remoteLocation, localLocation);
     }
 
-    public StompNIOTransport(WireFormat wireFormat, Socket socket) throws IOException {
+    public MQTTNIOTransport(WireFormat wireFormat, Socket socket) throws IOException {
         super(wireFormat, socket);
     }
 
     protected void initializeStreams() throws IOException {
         channel = socket.getChannel();
         channel.configureBlocking(false);
-
         // listen for events telling us when the socket is readable.
         selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() {
             public void onSelect(SelectorSelection selection) {
-                serviceRead();
+                if (!isStopped()) {
+                    serviceRead();
+                }
             }
 
             public void onError(SelectorSelection selection, Throwable error) {
                 if (error instanceof IOException) {
-                    onException((IOException)error);
+                    onException((IOException) error);
                 } else {
                     onException(IOExceptionSupport.create(error));
                 }
@@ -82,35 +79,34 @@ public class StompNIOTransport extends T
         NIOOutputStream outPutStream = new NIOOutputStream(channel, 8 * 1024);
         this.dataOut = new DataOutputStream(outPutStream);
         this.buffOut = outPutStream;
-        codec = new StompCodec(this);
+        codec = new MQTTCodec(this);
     }
 
     private void serviceRead() {
         try {
 
-           while (true) {
-               // read channel
-               int readSize = channel.read(inputBuffer);
-               // channel is closed, cleanup
-               if (readSize == -1) {
-                   onException(new EOFException());
-                   selection.close();
-                   break;
-               }
-               // nothing more to read, break
-               if (readSize == 0) {
-                   break;
-               }
-
-               inputBuffer.flip();
+            while (isStarted()) {
+                // read channel
+                int readSize = channel.read(inputBuffer);
+                // channel is closed, cleanup
+                if (readSize == -1) {
+                    onException(new EOFException());
+                    selection.close();
+                    break;
+                }
+                // nothing more to read, break
+                if (readSize == 0) {
+                    break;
+                }
 
-               ByteArrayInputStream input = new ByteArrayInputStream(inputBuffer.array());
-               codec.parse(input, readSize);
+                inputBuffer.flip();
+                DataByteArrayInputStream dis = new DataByteArrayInputStream(inputBuffer.array());
+                codec.parse(dis, readSize);
 
-               // clear the buffer
-               inputBuffer.clear();
+                // clear the buffer
+                inputBuffer.clear();
 
-           }
+            }
         } catch (IOException e) {
             onException(e);
         } catch (Throwable e) {
@@ -133,4 +129,4 @@ public class StompNIOTransport extends T
             super.doStop(stopper);
         }
     }
-}
+}
\ No newline at end of file

Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java (from r1310205, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransportFactory.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransportFactory.java&r1=1310205&r2=1324714&rev=1324714&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java Wed Apr 11 11:52:06 2012
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.transport.stomp;
+package org.apache.activemq.transport.mqtt;
 
 import java.io.IOException;
 import java.net.Socket;
@@ -26,7 +26,6 @@ import java.util.Map;
 
 import javax.net.ServerSocketFactory;
 import javax.net.SocketFactory;
-
 import org.apache.activemq.broker.BrokerContext;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
@@ -39,28 +38,26 @@ import org.apache.activemq.util.Introspe
 import org.apache.activemq.wireformat.WireFormat;
 
 /**
- * A <a href="http://stomp.codehaus.org/">STOMP</a> over NIO transport factory
- *
- *
+ * A <a href="http://mqtt.org/">MQTT</a> over NIO transport factory
  */
-public class StompNIOTransportFactory extends NIOTransportFactory implements BrokerServiceAware {
+public class MQTTNIOTransportFactory extends NIOTransportFactory implements BrokerServiceAware {
 
     private BrokerContext brokerContext = null;
 
     protected String getDefaultWireFormatType() {
-        return "stomp";
+        return "mqtt";
     }
 
     protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
         return new TcpTransportServer(this, location, serverSocketFactory) {
             protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
-                return new StompNIOTransport(format, socket);
+                return new MQTTNIOTransport(format, socket);
             }
         };
     }
 
     protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
-        return new StompNIOTransport(wf, socketFactory, location, localLocation);
+        return new MQTTNIOTransport(wf, socketFactory, location, localLocation);
     }
 
     @SuppressWarnings("rawtypes")
@@ -78,7 +75,7 @@ public class StompNIOTransportFactory ex
 
     @SuppressWarnings("rawtypes")
     public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
-        transport = new StompTransportFilter(transport, format, brokerContext);
+        transport = new MQTTTransportFilter(transport, format, brokerContext);
         IntrospectionSupport.setProperties(transport, options);
         return super.compositeConfigure(transport, format, options);
     }
@@ -87,5 +84,12 @@ public class StompNIOTransportFactory ex
         this.brokerContext = brokerService.getBrokerContext();
     }
 
+    protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
+        MQTTInactivityMonitor monitor = new MQTTInactivityMonitor(transport, format);
+        MQTTTransportFilter filter = transport.narrow(MQTTTransportFilter.class);
+        filter.setInactivityMonitor(monitor);
+        return monitor;
+    }
+
 }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java?rev=1324714&r1=1324713&r2=1324714&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java Wed Apr 11 11:52:06 2012
@@ -28,7 +28,6 @@ import javax.jms.JMSException;
 import javax.jms.Message;
 import org.apache.activemq.broker.BrokerContext;
 import org.apache.activemq.command.*;
-import org.apache.activemq.transport.stomp.ProtocolException;
 import org.apache.activemq.util.ByteArrayOutputStream;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.IOExceptionSupport;
@@ -56,16 +55,14 @@ class MQTTProtocolConverter {
     private final ProducerId producerId = new ProducerId(sessionId, 1);
     private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
     private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
-    private final LongSequenceGenerator tempDestinationGenerator = new LongSequenceGenerator();
 
     private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
     private final ConcurrentHashMap<ConsumerId, MQTTSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, MQTTSubscription>();
     private final ConcurrentHashMap<UTF8Buffer, MQTTSubscription> mqttSubscriptionByTopic = new ConcurrentHashMap<UTF8Buffer, MQTTSubscription>();
-    private final ConcurrentHashMap<String, ActiveMQDestination> tempDestinations = new ConcurrentHashMap<String, ActiveMQDestination>();
-    private final ConcurrentHashMap<String, String> tempDestinationAmqToStompMap = new ConcurrentHashMap<String, String>();
     private final Map<UTF8Buffer, ActiveMQTopic> activeMQTopicMap = new LRUCache<UTF8Buffer, ActiveMQTopic>();
     private final Map<Destination, UTF8Buffer> mqttTopicMap = new LRUCache<Destination, UTF8Buffer>();
     private final Map<Short, MessageAck> consumerAcks = new LRUCache<Short, MessageAck>();
+    private final Map<Short, PUBREC> publisherRecs = new LRUCache<Short, PUBREC>();
     private final MQTTTransport mqttTransport;
 
     private final Object commnadIdMutex = new Object();
@@ -143,6 +140,18 @@ class MQTTProtocolConverter {
                 onMQTTPubAck(new PUBACK().decode(frame));
                 break;
             }
+            case PUBREC.TYPE: {
+                onMQTTPubRec(new PUBREC().decode(frame));
+                break;
+            }
+            case PUBREL.TYPE: {
+                onMQTTPubRel(new PUBREL().decode(frame));
+                break;
+            }
+            case PUBCOMP.TYPE: {
+                onMQTTPubComp(new PUBCOMP().decode(frame));
+                break;
+            }
             default:
                 handleException(new MQTTProtocolException("Unknown MQTTFrame type: " + frame.messageType(), true), frame);
         }
@@ -150,10 +159,10 @@ class MQTTProtocolConverter {
     }
 
 
-    void onMQTTConnect(final CONNECT connect) throws ProtocolException {
+    void onMQTTConnect(final CONNECT connect) throws MQTTProtocolException {
 
         if (connected.get()) {
-            throw new ProtocolException("All ready connected.");
+            throw new MQTTProtocolException("All ready connected.");
         }
         this.connect = connect;
 
@@ -268,7 +277,7 @@ class MQTTProtocolConverter {
             consumerInfo.setSubscriptionName(connect.clientId().toString());
         }
 
-        MQTTSubscription mqttSubscription = new MQTTSubscription(this, command.qos(), consumerInfo);
+        MQTTSubscription mqttSubscription = new MQTTSubscription(this, topic.qos(), consumerInfo);
 
 
         subscriptionsByConsumerId.put(id, mqttSubscription);
@@ -327,13 +336,16 @@ class MQTTProtocolConverter {
             MQTTSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId());
             if (sub != null) {
                 MessageAck ack = sub.createMessageAck(md);
-                PUBLISH publish = convertMessage((ActiveMQMessage) md.getMessage());
-                if (ack != null) {
+                PUBLISH publish = sub.createPublish((ActiveMQMessage) md.getMessage());
+                if (ack != null && sub.expectAck()) {
                     synchronized (consumerAcks) {
                         consumerAcks.put(publish.messageId(), ack);
                     }
                 }
                 getMQTTTransport().sendToMQTT(publish.encode());
+                if (ack != null && !sub.expectAck()) {
+                    getMQTTTransport().sendToActiveMQ(ack);
+                }
             }
         } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
             // Pass down any unexpected async errors. Should this close the connection?
@@ -356,7 +368,38 @@ class MQTTProtocolConverter {
 
     void onMQTTPubAck(PUBACK command) {
         short messageId = command.messageId();
-        MessageAck ack = null;
+        MessageAck ack;
+        synchronized (consumerAcks) {
+            ack = consumerAcks.remove(messageId);
+        }
+        if (ack != null) {
+            getMQTTTransport().sendToActiveMQ(ack);
+        }
+    }
+
+    void onMQTTPubRec(PUBREC commnand) {
+        //from a subscriber - send a PUBREL in response
+        PUBREL pubrel = new PUBREL();
+        pubrel.messageId(commnand.messageId());
+        sendToMQTT(pubrel.encode());
+    }
+
+    void onMQTTPubRel(PUBREL command) {
+        PUBREC ack;
+        synchronized (publisherRecs) {
+            ack = publisherRecs.remove(command.messageId());
+        }
+        if (ack == null) {
+            LOG.warn("Unknown PUBREL: " + command.messageId() + " received");
+        }
+        PUBCOMP pubcomp = new PUBCOMP();
+        pubcomp.messageId(command.messageId());
+        sendToMQTT(pubcomp.encode());
+    }
+
+    void onMQTTPubComp(PUBCOMP command) {
+        short messageId = command.messageId();
+        MessageAck ack;
         synchronized (consumerAcks) {
             ack = consumerAcks.remove(messageId);
         }
@@ -461,19 +504,24 @@ class MQTTProtocolConverter {
         return mqttTransport;
     }
 
-    public ActiveMQDestination createTempDestination(String name, boolean topic) {
-        ActiveMQDestination rc = tempDestinations.get(name);
-        if (rc == null) {
-            if (topic) {
-                rc = new ActiveMQTempTopic(connectionId, tempDestinationGenerator.getNextSequenceId());
-            } else {
-                rc = new ActiveMQTempQueue(connectionId, tempDestinationGenerator.getNextSequenceId());
+    public void onTransportError() {
+        if (connect != null) {
+            if (connect.willTopic() != null && connect.willMessage() != null) {
+                try {
+                    PUBLISH publish = new PUBLISH();
+                    publish.topicName(connect.willTopic());
+                    publish.qos(connect.willQos());
+                    publish.payload(connect.willMessage());
+                    ActiveMQMessage message = convertMessage(publish);
+                    message.setProducerId(producerId);
+                    message.onSend();
+                    sendToActiveMQ(message, null);
+                } catch (Exception e) {
+                    LOG.warn("Failed to publish Will Message " + connect.willMessage());
+                }
+
             }
-            sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null);
-            tempDestinations.put(name, rc);
-            tempDestinationAmqToStompMap.put(rc.getQualifiedName(), name);
         }
-        return rc;
     }
 
 
@@ -482,7 +530,7 @@ class MQTTProtocolConverter {
 
             int heartBeatMS = heartBeat * 1000;
             MQTTInactivityMonitor monitor = getMQTTTransport().getInactivityMonitor();
-
+            monitor.setProtocolConverter(this);
             monitor.setReadCheckTime(heartBeatMS);
             monitor.setInitialDelayTime(heartBeatMS);
             monitor.startMonitorThread();
@@ -555,8 +603,11 @@ class MQTTProtocolConverter {
                             if (response.isException()) {
                                 LOG.warn("Failed to send MQTT Publish: ", command, ((ExceptionResponse) response).getException());
                             } else {
-                                PUBACK ack = new PUBACK();
+                                PUBREC ack = new PUBREC();
                                 ack.messageId(command.messageId());
+                                synchronized (publisherRecs) {
+                                    publisherRecs.put(command.messageId(), ack);
+                                }
                                 converter.getMQTTTransport().sendToMQTT(ack.encode());
                             }
                         }
@@ -565,27 +616,6 @@ class MQTTProtocolConverter {
                     break;
             }
         }
-        /*
-        final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
-        if (receiptId != null) {
-            return new ResponseHandler() {
-                public void onResponse(ProtocolConverter converter, Response response) throws IOException {
-                    if (response.isException()) {
-                        // Generally a command can fail.. but that does not invalidate the connection.
-                        // We report back the failure but we don't close the connection.
-                        Throwable exception = ((ExceptionResponse)response).getException();
-                        handleException(exception, command);
-                    } else {
-                        StompFrame sc = new StompFrame();
-                        sc.setAction(Stomp.Responses.RECEIPT);
-                        sc.setHeaders(new HashMap<String, String>(1));
-                        sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
-                        stompTransport.sendToStomp(sc);
-                    }
-                }
-            };
-        }
-        */
         return null;
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java?rev=1324714&r1=1324713&r2=1324714&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java Wed Apr 11 11:52:06 2012
@@ -16,11 +16,17 @@
  */
 package org.apache.activemq.transport.mqtt;
 
+import java.io.IOException;
+import java.util.zip.DataFormatException;
+
+import javax.jms.JMSException;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
 import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.codec.PUBLISH;
 
 /**
  * Keeps track of the MQTT client subscription so that acking is correctly done.
@@ -39,16 +45,20 @@ class MQTTSubscription {
     }
 
     MessageAck createMessageAck(MessageDispatch md) {
+        return new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
+    }
 
-        switch (qos) {
-            case AT_MOST_ONCE: {
-                return null;
-            }
-
+    PUBLISH createPublish(ActiveMQMessage message) throws DataFormatException, IOException, JMSException {
+        PUBLISH publish = protocolConverter.convertMessage(message);
+        if (publish.qos().ordinal() > this.qos.ordinal()) {
+            publish.qos(this.qos);
         }
-        return new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
+        return publish;
     }
 
+    public boolean expectAck() {
+        return qos != QoS.AT_MOST_ONCE;
+    }
 
     public void setDestination(ActiveMQDestination destination) {
         this.destination = destination;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java?rev=1324714&r1=1324713&r2=1324714&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java Wed Apr 11 11:52:06 2012
@@ -33,8 +33,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * The StompTransportFilter normally sits on top of a TcpTransport that has been
- * configured with the StompWireFormat and is used to convert STOMP commands to
+ * The MQTTTransportFilter normally sits on top of a TcpTransport that has been
+ * configured with the StompWireFormat and is used to convert MQTT commands to
  * ActiveMQ commands. All of the conversion work is done by delegating to the
  * MQTTProtocolConverter
  */
@@ -73,7 +73,7 @@ public class MQTTTransportFilter extends
 
             protocolConverter.onMQTTCommand((MQTTFrame) command);
         } catch (IOException e) {
-            onException(e);
+            handleException(e);
         } catch (JMSException e) {
             onException(IOExceptionSupport.create(e));
         }
@@ -129,5 +129,10 @@ public class MQTTTransportFilter extends
         return this.wireFormat;
     }
 
+    public void handleException(IOException e) {
+        protocolConverter.onTransportError();
+        super.onException(e);
+    }
+
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java?rev=1324714&r1=1324713&r2=1324714&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java Wed Apr 11 11:52:06 2012
@@ -36,7 +36,7 @@ import org.fusesource.mqtt.codec.MQTTFra
 public class MQTTWireFormat implements WireFormat {
 
 
-    private static final int MAX_MESSAGE_LENGTH = 1024 * 1024 * 256;
+    static final int MAX_MESSAGE_LENGTH = 1024 * 1024 * 256;
 
     private boolean encodingEnabled = false;
     private int version = 1;
@@ -79,8 +79,7 @@ public class MQTTWireFormat implements W
     public Object unmarshal(DataInput dataIn) throws IOException {
         byte header = dataIn.readByte();
 
-        byte digit = 0;
-
+        byte digit;
         int multiplier = 1;
         int length = 0;
         do {
@@ -89,6 +88,7 @@ public class MQTTWireFormat implements W
             multiplier <<= 7;
         }
         while ((digit & 0x80) != 0);
+
         if (length >= 0) {
             if (length > MAX_MESSAGE_LENGTH) {
                 throw new IOException("The maximum message length was exceeded");

Copied: activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt+nio (from r1310205, activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp+nio)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt%2Bnio?p2=activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt%2Bnio&p1=activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp%2Bnio&r1=1310205&r2=1324714&rev=1324714&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp+nio (original)
+++ activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt+nio Wed Apr 11 11:52:06 2012
@@ -14,4 +14,4 @@
 ## See the License for the specific language governing permissions and
 ## limitations under the License.
 ## ---------------------------------------------------------------------------
-class=org.apache.activemq.transport.stomp.StompNIOTransportFactory
+class=org.apache.activemq.transport.mqtt.MQTTNIOTransportFactory

Copied: activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt+nio+ssl (from r1310205, activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp+nio+ssl)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt%2Bnio%2Bssl?p2=activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt%2Bnio%2Bssl&p1=activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp%2Bnio%2Bssl&r1=1310205&r2=1324714&rev=1324714&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp+nio+ssl (original)
+++ activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt+nio+ssl Wed Apr 11 11:52:06 2012
@@ -14,4 +14,4 @@
 ## See the License for the specific language governing permissions and
 ## limitations under the License.
 ## ---------------------------------------------------------------------------
-class=org.apache.activemq.transport.stomp.StompNIOSSLTransportFactory
\ No newline at end of file
+class=org.apache.activemq.transport.mqtt.MQTTNIOSSLTransportFactory
\ No newline at end of file

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java?rev=1324714&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java Wed Apr 11 11:52:06 2012
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mqtt;
+
+import java.security.SecureRandom;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+import org.apache.activemq.broker.BrokerService;
+import org.fusesource.mqtt.client.MQTT;
+
+public class MQTTSSLTest extends MQTTTest {
+    public void startBroker() throws Exception {
+        System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
+        System.setProperty("javax.net.ssl.trustStorePassword", "password");
+        System.setProperty("javax.net.ssl.trustStoreType", "jks");
+        System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore");
+        System.setProperty("javax.net.ssl.keyStorePassword", "password");
+        System.setProperty("javax.net.ssl.keyStoreType", "jks");
+        super.startBroker();
+    }
+
+    protected void addMQTTConnector(BrokerService brokerService) throws Exception {
+        brokerService.addConnector("mqtt+ssl://localhost:8883");
+    }
+
+    protected MQTT createMQTTConnection() throws Exception {
+        MQTT mqtt = new MQTT();
+        mqtt.setHost("ssl://localhost:8883");
+        SSLContext ctx = SSLContext.getInstance("TLS");
+        ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom());
+        mqtt.setSslContext(ctx);
+        return mqtt;
+    }
+
+    static class DefaultTrustManager implements X509TrustManager {
+
+        public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
+        }
+
+        public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
+        }
+
+        public X509Certificate[] getAcceptedIssuers() {
+            return new X509Certificate[0];
+        }
+    }
+
+}

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java?rev=1324714&r1=1324713&r2=1324714&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java Wed Apr 11 11:52:06 2012
@@ -36,13 +36,15 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import static org.fusesource.hawtbuf.UTF8Buffer.utf8;
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 
 
 public class MQTTTest {
-    private static final Logger LOG = LoggerFactory.getLogger(MQTTConnectTest.class);
-    BrokerService brokerService;
-    Vector<Throwable> exceptions = new Vector<Throwable>();
+    protected static final Logger LOG = LoggerFactory.getLogger(MQTTTest.class);
+    protected BrokerService brokerService;
+    protected Vector<Throwable> exceptions = new Vector<Throwable>();
+    protected int numberOfMessages;
 
     @Before
     public void startBroker() throws Exception {
@@ -50,6 +52,7 @@ public class MQTTTest {
         brokerService = new BrokerService();
         brokerService.setPersistent(false);
         brokerService.setAdvisorySupport(false);
+        this.numberOfMessages = 2000;
     }
 
     @After
@@ -60,19 +63,40 @@ public class MQTTTest {
     }
 
     @Test
-    public void testSendAndReceiveAtLeastOnce() throws Exception {
+    public void testSendAndReceiveAtMostOnce() throws Exception {
+        addMQTTConnector(brokerService);
+        brokerService.start();
+        MQTT mqtt = createMQTTConnection();
+        mqtt.setKeepAlive(Short.MAX_VALUE);
+        BlockingConnection connection = mqtt.blockingConnection();
 
-        brokerService.addConnector("mqtt://localhost:1883");
+        connection.connect();
+
+
+        Topic[] topics = {new Topic(utf8("foo"), QoS.AT_MOST_ONCE)};
+        connection.subscribe(topics);
+        for (int i = 0; i < numberOfMessages; i++) {
+            String payload = "Test Message: " + i;
+            connection.publish("foo", payload.getBytes(), QoS.AT_MOST_ONCE, false);
+            Message message = connection.receive();
+            assertEquals(payload, new String(message.getPayload()));
+        }
+        connection.disconnect();
+    }
+
+    @Test
+    public void testSendAndReceiveAtLeastOnce() throws Exception {
+        addMQTTConnector(brokerService);
         brokerService.start();
-        MQTT mqtt = new MQTT();
-        mqtt.setHost("localhost", 1883);
+        MQTT mqtt = createMQTTConnection();
+        mqtt.setKeepAlive(Short.MAX_VALUE);
         BlockingConnection connection = mqtt.blockingConnection();
 
         connection.connect();
 
         Topic[] topics = {new Topic(utf8("foo"), QoS.AT_LEAST_ONCE)};
         connection.subscribe(topics);
-        for (int i = 0; i < 10000; i++) {
+        for (int i = 0; i < numberOfMessages; i++) {
             String payload = "Test Message: " + i;
             connection.publish("foo", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
             Message message = connection.receive();
@@ -83,13 +107,70 @@ public class MQTTTest {
     }
 
     @Test
-    public void testSendMQTTReceiveJMS() throws Exception {
+    public void testSendAndReceiveExactlyOnce() throws Exception {
+        addMQTTConnector(brokerService);
+        brokerService.start();
+        MQTT publisher = createMQTTConnection();
+        BlockingConnection pubConnection = publisher.blockingConnection();
 
-        brokerService.addConnector("mqtt://localhost:1883");
+        pubConnection.connect();
+
+        MQTT subscriber = createMQTTConnection();
+        BlockingConnection subConnection = subscriber.blockingConnection();
+
+        subConnection.connect();
+
+        Topic[] topics = {new Topic(utf8("foo"), QoS.EXACTLY_ONCE)};
+        subConnection.subscribe(topics);
+        for (int i = 0; i < numberOfMessages; i++) {
+            String payload = "Test Message: " + i;
+            pubConnection.publish("foo", payload.getBytes(), QoS.EXACTLY_ONCE, false);
+            Message message = subConnection.receive();
+            message.ack();
+            assertEquals(payload, new String(message.getPayload()));
+        }
+        subConnection.disconnect();
+        pubConnection.disconnect();
+    }
+
+    @Test
+    public void testSendAndReceiveLargeMessages() throws Exception {
+        byte[] payload = new byte[1024 * 32];
+        for (int i = 0; i < payload.length; i++){
+            payload[i] = '2';
+        }
+        addMQTTConnector(brokerService);
+        brokerService.start();
+
+        MQTT publisher = createMQTTConnection();
+        BlockingConnection pubConnection = publisher.blockingConnection();
+
+        pubConnection.connect();
+
+        MQTT subscriber = createMQTTConnection();
+        BlockingConnection subConnection = subscriber.blockingConnection();
+
+        subConnection.connect();
+
+        Topic[] topics = {new Topic(utf8("foo"), QoS.AT_LEAST_ONCE)};
+        subConnection.subscribe(topics);
+        for (int i = 0; i < 10; i++) {
+            pubConnection.publish("foo", payload, QoS.AT_LEAST_ONCE, false);
+            Message message = subConnection.receive();
+            message.ack();
+            assertArrayEquals(payload, message.getPayload());
+        }
+        subConnection.disconnect();
+        pubConnection.disconnect();
+    }
+
+
+    @Test
+    public void testSendMQTTReceiveJMS() throws Exception {
+        addMQTTConnector(brokerService);
         brokerService.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);
         brokerService.start();
-        MQTT mqtt = new MQTT();
-        mqtt.setHost("localhost", 1883);
+        MQTT mqtt = createMQTTConnection();
         BlockingConnection connection = mqtt.blockingConnection();
         final String DESTINATION_NAME = "foo";
         connection.connect();
@@ -100,7 +181,7 @@ public class MQTTTest {
         javax.jms.Topic jmsTopic = s.createTopic(DESTINATION_NAME);
         MessageConsumer consumer = s.createConsumer(jmsTopic);
 
-        for (int i = 0; i < 10000; i++) {
+        for (int i = 0; i < numberOfMessages; i++) {
             String payload = "Test Message: " + i;
             connection.publish("foo", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
             ActiveMQMessage message = (ActiveMQMessage) consumer.receive();
@@ -113,5 +194,15 @@ public class MQTTTest {
         connection.disconnect();
     }
 
+    protected void addMQTTConnector(BrokerService brokerService) throws Exception {
+        brokerService.addConnector("mqtt://localhost:1883");
+    }
+
+    protected MQTT createMQTTConnection() throws Exception {
+        MQTT mqtt = new MQTT();
+        mqtt.setHost("localhost", 1883);
+        return mqtt;
+    }
+
 
 }
\ No newline at end of file

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MTQQNioTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MTQQNioTest.java?rev=1324714&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MTQQNioTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MTQQNioTest.java Wed Apr 11 11:52:06 2012
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mqtt;
+
+import org.apache.activemq.broker.BrokerService;
+
+public class MTQQNioTest extends MQTTTest {
+    protected void addMQTTConnector(BrokerService brokerService) throws Exception {
+        brokerService.addConnector("mqtt+nio://localhost:1883?maxInactivityDuration=-1");
+    }
+
+}