You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/08/05 00:58:28 UTC
git commit: https://issues.apache.org/jira/browse/AMQ-5307
Repository: activemq
Updated Branches:
refs/heads/trunk 9743dbddb -> 7c04ead46
https://issues.apache.org/jira/browse/AMQ-5307
Fixed the MQTTCodec to properly handle frames that come in split up or
bunched together.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7c04ead4
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7c04ead4
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7c04ead4
Branch: refs/heads/trunk
Commit: 7c04ead460f646517f39b365ed6168dd1c1e556c
Parents: 9743dbd
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon Aug 4 18:58:03 2014 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon Aug 4 18:58:03 2014 -0400
----------------------------------------------------------------------
.../activemq/transport/mqtt/MQTTCodec.java | 243 ++++++++++---------
.../transport/mqtt/MQTTNIOSSLTransport.java | 4 +-
.../transport/mqtt/MQTTProtocolSupport.java | 48 ++++
.../transport/mqtt/MQTTAuthTestSupport.java | 8 +
.../activemq/transport/mqtt/MQTTAuthTests.java | 8 +-
.../activemq/transport/mqtt/MQTTCodecTest.java | 178 ++++++++++++++
.../activemq/transport/mqtt/MQTTNIOSSLTest.java | 33 +++
7 files changed, 407 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/7c04ead4/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTCodec.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTCodec.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTCodec.java
index f3e16aa..c892dd1 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTCodec.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTCodec.java
@@ -18,144 +18,165 @@ package org.apache.activemq.transport.mqtt;
import java.io.IOException;
-import javax.jms.JMSException;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.fusesource.hawtbuf.DataByteArrayInputStream;
import org.fusesource.hawtbuf.DataByteArrayOutputStream;
-import org.fusesource.mqtt.codec.*;
+import org.fusesource.mqtt.codec.MQTTFrame;
public class MQTTCodec {
- TcpTransport transport;
+ private final MQTTFrameSink frameSink;
+ private final DataByteArrayOutputStream currentCommand = new DataByteArrayOutputStream();
+ private byte header;
- DataByteArrayOutputStream currentCommand = new DataByteArrayOutputStream();
- boolean processedHeader = false;
- String action;
- byte header;
- int contentLength = -1;
- int previousByte = -1;
- int payLoadRead = 0;
+ private int contentLength = -1;
+ private int payLoadRead = 0;
- public MQTTCodec(TcpTransport transport) {
- this.transport = transport;
+ public interface MQTTFrameSink {
+ void onFrame(MQTTFrame mqttFrame);
}
- public void parse(DataByteArrayInputStream input, int readSize) throws Exception {
- int i = 0;
- byte b;
- while (i++ < readSize) {
- b = input.readByte();
- // skip repeating nulls
- if (!processedHeader && b == 0) {
- previousByte = 0;
- continue;
- }
+ private FrameParser currentParser;
- if (!processedHeader) {
- i += processHeader(b, input);
- if (contentLength == 0) {
- processCommand();
- }
+ // Internal parsers implement this and we switch to the next as we go.
+ private interface FrameParser {
- } else {
+ void parse(DataByteArrayInputStream data, int readSize) throws IOException;
- if (contentLength == -1) {
- // end of command reached, unmarshal
- if (b == 0) {
- processCommand();
- } else {
- currentCommand.write(b);
- }
- } else {
- // read desired content length
- if (payLoadRead == contentLength) {
- processCommand();
- i += processHeader(b, input);
- } else {
- currentCommand.write(b);
- payLoadRead++;
- }
- }
+ void reset() throws IOException;
+ }
+
+ public MQTTCodec(MQTTFrameSink sink) {
+ this.frameSink = sink;
+ }
+
+ public MQTTCodec(final TcpTransport transport) {
+ this.frameSink = new MQTTFrameSink() {
+
+ @Override
+ public void onFrame(MQTTFrame mqttFrame) {
+ transport.doConsume(mqttFrame);
}
+ };
+ }
- previousByte = b;
- }
- if (processedHeader && payLoadRead == contentLength) {
- processCommand();
+ public void parse(DataByteArrayInputStream input, int readSize) throws Exception {
+ if (currentParser == null) {
+ currentParser = initializeHeaderParser();
}
+
+ // Parser stack will run until current incoming data has all been consumed.
+ currentParser.parse(input, readSize);
}
- /**
- * sets the content length
- *
- * @return number of bytes read
- */
- private int processHeader(byte header, DataByteArrayInputStream input) {
- this.header = header;
- byte digit;
- int multiplier = 1;
- int read = 0;
- int length = 0;
- do {
- digit = input.readByte();
- length += (digit & 0x7F) * multiplier;
- multiplier <<= 7;
- read++;
- } while ((digit & 0x80) != 0);
-
- contentLength = length;
- processedHeader = true;
- return read;
+ private void processCommand() throws IOException {
+ MQTTFrame frame = new MQTTFrame(currentCommand.toBuffer().deepCopy()).header(header);
+ frameSink.onFrame(frame);
}
+ //----- Prepare the current frame parser for use -------------------------//
- private void processCommand() throws Exception {
- MQTTFrame frame = new MQTTFrame(currentCommand.toBuffer().deepCopy()).header(header);
- transport.doConsume(frame);
- processedHeader = false;
- currentCommand.reset();
- contentLength = -1;
- payLoadRead = 0;
+ private FrameParser initializeHeaderParser() throws IOException {
+ headerParser.reset();
+ return headerParser;
}
- public static String commandType(byte header) throws IOException, JMSException {
+ private FrameParser initializeVariableLengthParser() throws IOException {
+ variableLengthParser.reset();
+ return variableLengthParser;
+ }
- byte messageType = (byte) ((header & 0xF0) >>> 4);
- switch (messageType) {
- case PINGREQ.TYPE: {
- return "PINGREQ";
- }
- case CONNECT.TYPE: {
- return "CONNECT";
- }
- case DISCONNECT.TYPE: {
- return "DISCONNECT";
- }
- case SUBSCRIBE.TYPE: {
- return "SUBSCRIBE";
- }
- case UNSUBSCRIBE.TYPE: {
- return "UNSUBSCRIBE";
- }
- case PUBLISH.TYPE: {
- return "PUBLISH";
- }
- case PUBACK.TYPE: {
- return "PUBACK";
- }
- case PUBREC.TYPE: {
- return "PUBREC";
- }
- case PUBREL.TYPE: {
- return "PUBREL";
+ private FrameParser initializeContentParser() throws IOException {
+ contentParser.reset();
+ return contentParser;
+ }
+
+ //----- Frame parser implementations -------------------------------------//
+
+ private final FrameParser headerParser = new FrameParser() {
+
+ @Override
+ public void parse(DataByteArrayInputStream data, int readSize) throws IOException {
+ int i = 0;
+ while (i++ < readSize) {
+ byte b = data.readByte();
+ // skip repeating nulls
+ if (b == 0) {
+ continue;
+ }
+
+ header = b;
+
+ currentParser = initializeVariableLengthParser();
+ currentParser.parse(data, readSize - 1);
+ return;
}
- case PUBCOMP.TYPE: {
- return "PUBCOMP";
+ }
+
+ @Override
+ public void reset() throws IOException {
+ header = -1;
+ }
+ };
+
+ private final FrameParser contentParser = new FrameParser() {
+
+ @Override
+ public void parse(DataByteArrayInputStream data, int readSize) throws IOException {
+ int i = 0;
+ while (i++ < readSize) {
+ currentCommand.write(data.readByte());
+ payLoadRead++;
+
+ if (payLoadRead == contentLength) {
+ processCommand();
+ currentParser = initializeHeaderParser();
+ currentParser.parse(data, readSize - i);
+ return;
+ }
}
- default:
- return "UNKNOWN";
}
- }
+ @Override
+ public void reset() throws IOException {
+ contentLength = -1;
+ payLoadRead = 0;
+ currentCommand.reset();
+ }
+ };
+
+ private final FrameParser variableLengthParser = new FrameParser() {
+
+ private byte digit;
+ private int multiplier = 1;
+ private int length;
+
+ @Override
+ public void parse(DataByteArrayInputStream data, int readSize) throws IOException {
+ int i = 0;
+ while (i++ < readSize) {
+ digit = data.readByte();
+ length += (digit & 0x7F) * multiplier;
+ multiplier <<= 7;
+ if ((digit & 0x80) == 0) {
+ if (length == 0) {
+ processCommand();
+ currentParser = initializeHeaderParser();
+ } else {
+ currentParser = initializeContentParser();
+ contentLength = length;
+ }
+ currentParser.parse(data, readSize - i);
+ return;
+ }
+ }
+ }
+ @Override
+ public void reset() throws IOException {
+ digit = 0;
+ multiplier = 1;
+ length = 0;
+ }
+ };
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/7c04ead4/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransport.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransport.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransport.java
index ef18fe0..f982848 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransport.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransport.java
@@ -23,13 +23,14 @@ import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import javax.net.SocketFactory;
+
import org.apache.activemq.transport.nio.NIOSSLTransport;
import org.apache.activemq.wireformat.WireFormat;
import org.fusesource.hawtbuf.DataByteArrayInputStream;
public class MQTTNIOSSLTransport extends NIOSSLTransport {
- MQTTCodec codec;
+ private MQTTCodec codec;
public MQTTNIOSSLTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
super(wireFormat, socketFactory, remoteLocation, localLocation);
@@ -55,5 +56,4 @@ public class MQTTNIOSSLTransport extends NIOSSLTransport {
DataByteArrayInputStream dis = new DataByteArrayInputStream(fill);
codec.parse(dis, fill.length);
}
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq/blob/7c04ead4/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolSupport.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolSupport.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolSupport.java
index 2def25c..a30ed50 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolSupport.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolSupport.java
@@ -16,6 +16,17 @@
*/
package org.apache.activemq.transport.mqtt;
+import org.fusesource.mqtt.codec.CONNECT;
+import org.fusesource.mqtt.codec.DISCONNECT;
+import org.fusesource.mqtt.codec.PINGREQ;
+import org.fusesource.mqtt.codec.PUBACK;
+import org.fusesource.mqtt.codec.PUBCOMP;
+import org.fusesource.mqtt.codec.PUBLISH;
+import org.fusesource.mqtt.codec.PUBREC;
+import org.fusesource.mqtt.codec.PUBREL;
+import org.fusesource.mqtt.codec.SUBSCRIBE;
+import org.fusesource.mqtt.codec.UNSUBSCRIBE;
+
/**
* A set of static methods useful for handling MQTT based client connections.
*/
@@ -70,4 +81,41 @@ public class MQTTProtocolSupport {
public static String convertActiveMQToMQTT(String destinationName) {
return destinationName.replace('.', '/');
}
+
+ /**
+ * Given an MQTT header byte, determine the command type that the header
+ * represents.
+ *
+ * @param header
+ * the byte value for the MQTT frame header.
+ *
+ * @return a string value for the given command type.
+ */
+ public static String commandType(byte header) {
+ byte messageType = (byte) ((header & 0xF0) >>> 4);
+ switch (messageType) {
+ case PINGREQ.TYPE:
+ return "PINGREQ";
+ case CONNECT.TYPE:
+ return "CONNECT";
+ case DISCONNECT.TYPE:
+ return "DISCONNECT";
+ case SUBSCRIBE.TYPE:
+ return "SUBSCRIBE";
+ case UNSUBSCRIBE.TYPE:
+ return "UNSUBSCRIBE";
+ case PUBLISH.TYPE:
+ return "PUBLISH";
+ case PUBACK.TYPE:
+ return "PUBACK";
+ case PUBREC.TYPE:
+ return "PUBREC";
+ case PUBREL.TYPE:
+ return "PUBREL";
+ case PUBCOMP.TYPE:
+ return "PUBCOMP";
+ default:
+ return "UNKNOWN";
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/7c04ead4/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTestSupport.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTestSupport.java
index bbe4d1a..6e327bf 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTestSupport.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTestSupport.java
@@ -34,6 +34,14 @@ import org.apache.activemq.security.TempDestinationAuthorizationEntry;
*/
public class MQTTAuthTestSupport extends MQTTTestSupport {
+ public MQTTAuthTestSupport() {
+ super();
+ }
+
+ public MQTTAuthTestSupport(String connectorScheme, boolean useSSL) {
+ super(connectorScheme, useSSL);
+ }
+
@Override
protected BrokerPlugin configureAuthentication() throws Exception {
List<AuthenticationUser> users = new ArrayList<AuthenticationUser>();
http://git-wip-us.apache.org/repos/asf/activemq/blob/7c04ead4/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTests.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTests.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTests.java
index 3cb856f..574d554 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTests.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTests.java
@@ -66,11 +66,15 @@ public class MQTTAuthTests extends MQTTAuthTestSupport {
return Arrays.asList(new Object[][] {
{"mqtt", false},
{"mqtt+ssl", true},
- {"mqtt+nio", false}
- // TODO - Fails {"mqtt+nio+ssl", true}
+ {"mqtt+nio", false},
+ {"mqtt+nio+ssl", true}
});
}
+ public MQTTAuthTests(String connectorScheme, boolean useSSL) {
+ super(connectorScheme, useSSL);
+ }
+
@Test(timeout = 60 * 1000)
public void testAnonymousUserConnect() throws Exception {
MQTT mqtt = createMQTTConnection();
http://git-wip-us.apache.org/repos/asf/activemq/blob/7c04ead4/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTCodecTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTCodecTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTCodecTest.java
new file mode 100644
index 0000000..31af1ab
--- /dev/null
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTCodecTest.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mqtt;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.DataByteArrayInputStream;
+import org.fusesource.hawtbuf.DataByteArrayOutputStream;
+import org.fusesource.hawtbuf.UTF8Buffer;
+import org.fusesource.mqtt.codec.CONNECT;
+import org.fusesource.mqtt.codec.MQTTFrame;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests the functionality of the MQTTCodec class.
+ */
+public class MQTTCodecTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MQTTCodecTest.class);
+
+ private final MQTTWireFormat wireFormat = new MQTTWireFormat();
+
+ private List<MQTTFrame> frames;
+ private MQTTCodec codec;
+
+ @Before
+ public void setUp() throws Exception {
+ frames = new ArrayList<MQTTFrame>();
+ codec = new MQTTCodec(new MQTTCodec.MQTTFrameSink() {
+
+ @Override
+ public void onFrame(MQTTFrame mqttFrame) {
+ frames.add(mqttFrame);
+ }
+ });
+ }
+
+ @Test
+ public void testEmptyConnectBytes() throws Exception {
+
+ CONNECT connect = new CONNECT();
+ connect.cleanSession(true);
+ connect.clientId(new UTF8Buffer(""));
+
+ DataByteArrayOutputStream output = new DataByteArrayOutputStream();
+ wireFormat.marshal(connect.encode(), output);
+ Buffer marshalled = output.toBuffer();
+
+ DataByteArrayInputStream input = new DataByteArrayInputStream(marshalled);
+ codec.parse(input, marshalled.length());
+
+ assertTrue(!frames.isEmpty());
+ assertEquals(1, frames.size());
+
+ connect = new CONNECT().decode(frames.get(0));
+ LOG.info("Unmarshalled: {}", connect);
+ assertTrue(connect.cleanSession());
+ }
+
+ @Test
+ public void testConnectWithCredentialsBackToBack() throws Exception {
+
+ CONNECT connect = new CONNECT();
+ connect.cleanSession(false);
+ connect.clientId(new UTF8Buffer("test"));
+ connect.userName(new UTF8Buffer("user"));
+ connect.password(new UTF8Buffer("pass"));
+
+ DataByteArrayOutputStream output = new DataByteArrayOutputStream();
+ wireFormat.marshal(connect.encode(), output);
+ wireFormat.marshal(connect.encode(), output);
+ Buffer marshalled = output.toBuffer();
+
+ DataByteArrayInputStream input = new DataByteArrayInputStream(marshalled);
+ codec.parse(input, marshalled.length());
+
+ assertTrue(!frames.isEmpty());
+ assertEquals(2, frames.size());
+
+ for (MQTTFrame frame : frames) {
+ connect = new CONNECT().decode(frame);
+ LOG.info("Unmarshalled: {}", connect);
+ assertFalse(connect.cleanSession());
+ assertEquals("user", connect.userName().toString());
+ assertEquals("pass", connect.password().toString());
+ assertEquals("test", connect.clientId().toString());
+ }
+ }
+
+ @Test
+ public void testProcessInChunks() throws Exception {
+
+ CONNECT connect = new CONNECT();
+ connect.cleanSession(false);
+ connect.clientId(new UTF8Buffer("test"));
+ connect.userName(new UTF8Buffer("user"));
+ connect.password(new UTF8Buffer("pass"));
+
+ DataByteArrayOutputStream output = new DataByteArrayOutputStream();
+ wireFormat.marshal(connect.encode(), output);
+ Buffer marshalled = output.toBuffer();
+
+ DataByteArrayInputStream input = new DataByteArrayInputStream(marshalled);
+
+ int first = marshalled.length() / 2;
+ int second = marshalled.length() - first;
+
+ codec.parse(input, first);
+ codec.parse(input, second);
+
+ assertTrue(!frames.isEmpty());
+ assertEquals(1, frames.size());
+
+ connect = new CONNECT().decode(frames.get(0));
+ LOG.info("Unmarshalled: {}", connect);
+ assertFalse(connect.cleanSession());
+
+ assertEquals("user", connect.userName().toString());
+ assertEquals("pass", connect.password().toString());
+ assertEquals("test", connect.clientId().toString());
+ }
+
+ @Test
+ public void testProcessInBytes() throws Exception {
+
+ CONNECT connect = new CONNECT();
+ connect.cleanSession(false);
+ connect.clientId(new UTF8Buffer("test"));
+ connect.userName(new UTF8Buffer("user"));
+ connect.password(new UTF8Buffer("pass"));
+
+ DataByteArrayOutputStream output = new DataByteArrayOutputStream();
+ wireFormat.marshal(connect.encode(), output);
+ Buffer marshalled = output.toBuffer();
+
+ DataByteArrayInputStream input = new DataByteArrayInputStream(marshalled);
+
+ int size = marshalled.length();
+
+ for (int i = 0; i < size; ++i) {
+ codec.parse(input, 1);
+ }
+
+ assertTrue(!frames.isEmpty());
+ assertEquals(1, frames.size());
+
+ connect = new CONNECT().decode(frames.get(0));
+ LOG.info("Unmarshalled: {}", connect);
+ assertFalse(connect.cleanSession());
+
+ assertEquals("user", connect.userName().toString());
+ assertEquals("pass", connect.password().toString());
+ assertEquals("test", connect.clientId().toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/7c04ead4/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTest.java
new file mode 100644
index 0000000..b6dd9f9
--- /dev/null
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTest.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mqtt;
+
+/**
+ * Run the basic tests with the NIO Transport.
+ */
+public class MQTTNIOSSLTest extends MQTTTest {
+
+ @Override
+ public String getProtocolScheme() {
+ return "mqtt+nio+ssl";
+ }
+
+ @Override
+ public boolean isUseSSL() {
+ return true;
+ }
+}