You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2012/04/11 13:52:07 UTC
svn commit: r1324714 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/transport/mqtt/
main/resources/META-INF/services/org/apache/activemq/transport/
test/java/org/apache/activemq/transport/mqtt/
Author: rajdavies
Date: Wed Apr 11 11:52:06 2012
New Revision: 1324714
URL: http://svn.apache.org/viewvc?rev=1324714&view=rev
Log:
added NIO support to the MQTT protocol for https://issues.apache.org/jira/browse/AMQ-3786
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTCodec.java
- copied, changed from r1310205, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransport.java
- copied, changed from r1310205, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransportFactory.java
- copied, changed from r1310205, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransportFactory.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransport.java
- copied, changed from r1310205, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java
- copied, changed from r1310205, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransportFactory.java
activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt+nio
- copied, changed from r1310205, activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp+nio
activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt+nio+ssl
- copied, changed from r1310205, activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp+nio+ssl
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MTQQNioTest.java
Removed:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTConnectTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLConnectTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTCodec.java (from r1310205, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTCodec.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTCodec.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java&r1=1310205&r2=1324714&rev=1324714&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTCodec.java Wed Apr 11 11:52:06 2012
@@ -14,89 +14,148 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.transport.stomp;
+package org.apache.activemq.transport.mqtt;
-import org.apache.activemq.transport.tcp.TcpTransport;
-import org.apache.activemq.util.ByteArrayOutputStream;
-import org.apache.activemq.util.DataByteArrayInputStream;
+import java.io.IOException;
-import java.io.ByteArrayInputStream;
-import java.util.HashMap;
+import javax.jms.JMSException;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.fusesource.hawtbuf.DataByteArrayInputStream;
+import org.fusesource.hawtbuf.DataByteArrayOutputStream;
+import org.fusesource.mqtt.codec.*;
-public class StompCodec {
+public class MQTTCodec {
- final static byte[] crlfcrlf = new byte[]{'\r','\n','\r','\n'};
TcpTransport transport;
- ByteArrayOutputStream currentCommand = new ByteArrayOutputStream();
- boolean processedHeaders = false;
+ DataByteArrayOutputStream currentCommand = new DataByteArrayOutputStream();
+ boolean processedHeader = false;
String action;
- HashMap<String, String> headers;
+ byte header;
int contentLength = -1;
- int readLength = 0;
int previousByte = -1;
+ int payLoadRead = 0;
- public StompCodec(TcpTransport transport) {
+ public MQTTCodec(TcpTransport transport) {
this.transport = transport;
}
- public void parse(ByteArrayInputStream input, int readSize) throws Exception {
- int i = 0;
- int b;
- while(i++ < readSize) {
- b = input.read();
- // skip repeating nulls
- if (!processedHeaders && previousByte == 0 && b == 0) {
- continue;
- }
-
- if (!processedHeaders) {
- currentCommand.write(b);
- // end of headers section, parse action and header
- if (b == '\n' && (previousByte == '\n' || currentCommand.endsWith(crlfcrlf))) {
- if (transport.getWireFormat() instanceof StompWireFormat) {
- DataByteArrayInputStream data = new DataByteArrayInputStream(currentCommand.toByteArray());
- action = ((StompWireFormat)transport.getWireFormat()).parseAction(data);
- headers = ((StompWireFormat)transport.getWireFormat()).parseHeaders(data);
- String contentLengthHeader = headers.get(Stomp.Headers.CONTENT_LENGTH);
- if ((action.equals(Stomp.Commands.SEND) || action.equals(Stomp.Responses.MESSAGE)) && contentLengthHeader != null) {
- contentLength = ((StompWireFormat)transport.getWireFormat()).parseContentLength(contentLengthHeader);
- } else {
- contentLength = -1;
- }
- }
- processedHeaders = true;
- currentCommand.reset();
- }
- } else {
-
- if (contentLength == -1) {
- // end of command reached, unmarshal
- if (b == 0) {
- processCommand();
- } else {
- currentCommand.write(b);
- }
- } else {
- // read desired content length
- if (readLength++ == contentLength) {
- processCommand();
- readLength = 0;
- } else {
- currentCommand.write(b);
- }
- }
- }
+ public void parse(DataByteArrayInputStream input, int readSize) throws Exception {
+ int i = 0;
+ byte b;
+ while (i++ < readSize) {
+ b = input.readByte();
+ // skip repeating nulls
+ if (!processedHeader && b == 0) {
+ previousByte = 0;
+ continue;
+ }
+
+ if (!processedHeader) {
+ i += processHeader(b, input);
+ if (contentLength == 0) {
+ processCommand();
+ }
+
+ } else {
+
+ if (contentLength == -1) {
+ // end of command reached, unmarshal
+ if (b == 0) {
+ processCommand();
+ } else {
+ currentCommand.write(b);
+ }
+ } else {
+ // read desired content length
+ if (payLoadRead == contentLength) {
+ processCommand();
+ i += processHeader(b, input);
+ } else {
+ currentCommand.write(b);
+ payLoadRead++;
+ }
+ }
+ }
+
+ previousByte = b;
+ }
+ if (processedHeader && payLoadRead == contentLength) {
+ processCommand();
+ }
+ }
- previousByte = b;
- }
+ /**
+ * sets the content length
+ *
+ * @return number of bytes read
+ */
+ private int processHeader(byte header, DataByteArrayInputStream input) {
+ this.header = header;
+ byte digit;
+ int multiplier = 1;
+ int read = 0;
+ int length = 0;
+ do {
+ digit = input.readByte();
+ length += (digit & 0x7F) * multiplier;
+ multiplier <<= 7;
+ read++;
+ } while ((digit & 0x80) != 0);
+
+ contentLength = length;
+ processedHeader = true;
+ return read;
}
- protected void processCommand() throws Exception {
- StompFrame frame = new StompFrame(action, headers, currentCommand.toByteArray());
+
+ private void processCommand() throws Exception {
+ MQTTFrame frame = new MQTTFrame(currentCommand.toBuffer().deepCopy()).header(header);
transport.doConsume(frame);
- processedHeaders = false;
+ processedHeader = false;
currentCommand.reset();
contentLength = -1;
+ payLoadRead = 0;
}
+
+ public static String commandType(byte header) throws IOException, JMSException {
+
+ byte messageType = (byte) ((header & 0xF0) >>> 4);
+ switch (messageType) {
+ case PINGREQ.TYPE: {
+ return "PINGREQ";
+ }
+ case CONNECT.TYPE: {
+ return "CONNECT";
+ }
+ case DISCONNECT.TYPE: {
+ return "DISCONNECT";
+ }
+ case SUBSCRIBE.TYPE: {
+ return "SUBSCRIBE";
+ }
+ case UNSUBSCRIBE.TYPE: {
+ return "UNSUBSCRIBE";
+ }
+ case PUBLISH.TYPE: {
+ return "PUBLISH";
+ }
+ case PUBACK.TYPE: {
+ return "PUBACK";
+ }
+ case PUBREC.TYPE: {
+ return "PUBREC";
+ }
+ case PUBREL.TYPE: {
+ return "PUBREL";
+ }
+ case PUBCOMP.TYPE: {
+ return "PUBCOMP";
+ }
+ default:
+ return "UNKNOWN";
+ }
+
+ }
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java?rev=1324714&r1=1324713&r2=1324714&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java Wed Apr 11 11:52:06 2012
@@ -62,6 +62,7 @@ public class MQTTInactivityMonitor exten
private long readCheckTime = DEFAULT_CHECK_TIME_MILLS;
private long initialDelayTime = DEFAULT_CHECK_TIME_MILLS;
private boolean keepAliveResponseRequired;
+ private MQTTProtocolConverter protocolConverter;
private final Runnable readChecker = new Runnable() {
@@ -125,6 +126,9 @@ public class MQTTInactivityMonitor exten
}
ASYNC_TASKS.execute(new Runnable() {
public void run() {
+ if (protocolConverter != null) {
+ protocolConverter.onTransportError();
+ }
onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: " + next.getRemoteAddress()));
}
@@ -225,6 +229,14 @@ public class MQTTInactivityMonitor exten
return this.monitorStarted.get();
}
+ public void setProtocolConverter(MQTTProtocolConverter protocolConverter) {
+ this.protocolConverter = protocolConverter;
+ }
+
+ public MQTTProtocolConverter getProtocolConverter() {
+ return protocolConverter;
+ }
+
synchronized void startMonitorThread() {
if (monitorStarted.get()) {
return;
Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransport.java (from r1310205, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransport.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransport.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransport.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransport.java&r1=1310205&r2=1324714&rev=1324714&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransport.java Wed Apr 11 11:52:06 2012
@@ -14,34 +14,34 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.transport.stomp;
+package org.apache.activemq.transport.mqtt;
-import org.apache.activemq.transport.nio.NIOSSLTransport;
-import org.apache.activemq.wireformat.WireFormat;
-
-import javax.net.SocketFactory;
-import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
-public class StompNIOSSLTransport extends NIOSSLTransport {
+import javax.net.SocketFactory;
+import org.apache.activemq.transport.nio.NIOSSLTransport;
+import org.apache.activemq.wireformat.WireFormat;
+import org.fusesource.hawtbuf.DataByteArrayInputStream;
+
+public class MQTTNIOSSLTransport extends NIOSSLTransport {
- StompCodec codec;
+ MQTTCodec codec;
- public StompNIOSSLTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
+ public MQTTNIOSSLTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
super(wireFormat, socketFactory, remoteLocation, localLocation);
}
- public StompNIOSSLTransport(WireFormat wireFormat, Socket socket) throws IOException {
+ public MQTTNIOSSLTransport(WireFormat wireFormat, Socket socket) throws IOException {
super(wireFormat, socket);
}
@Override
protected void initializeStreams() throws IOException {
- codec = new StompCodec(this);
+ codec = new MQTTCodec(this);
super.initializeStreams();
if (inputBuffer.position() != 0 && inputBuffer.hasRemaining()) {
serviceRead();
@@ -52,8 +52,8 @@ public class StompNIOSSLTransport extend
protected void processCommand(ByteBuffer plain) throws Exception {
byte[] fill = new byte[plain.remaining()];
plain.get(fill);
- ByteArrayInputStream input = new ByteArrayInputStream(fill);
- codec.parse(input, fill.length);
+ DataByteArrayInputStream dis = new DataByteArrayInputStream(fill);
+ codec.parse(dis, fill.length);
}
-}
+}
\ No newline at end of file
Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransportFactory.java (from r1310205, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransportFactory.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransportFactory.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransportFactory.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransportFactory.java&r1=1310205&r2=1324714&rev=1324714&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransportFactory.java Wed Apr 11 11:52:06 2012
@@ -14,8 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.transport.stomp;
+package org.apache.activemq.transport.mqtt;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+
+import javax.net.ServerSocketFactory;
+import javax.net.SocketFactory;
+import javax.net.ssl.SSLContext;
import org.apache.activemq.broker.SslContext;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportServer;
@@ -23,16 +32,7 @@ import org.apache.activemq.transport.tcp
import org.apache.activemq.transport.tcp.TcpTransportServer;
import org.apache.activemq.wireformat.WireFormat;
-import javax.net.ServerSocketFactory;
-import javax.net.SocketFactory;
-import javax.net.ssl.SSLContext;
-import java.io.IOException;
-import java.net.Socket;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-
-public class StompNIOSSLTransportFactory extends StompNIOTransportFactory {
+public class MQTTNIOSSLTransportFactory extends MQTTNIOTransportFactory {
SSLContext context;
@@ -40,7 +40,7 @@ public class StompNIOSSLTransportFactory
protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
return new TcpTransportServer(this, location, serverSocketFactory) {
protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
- StompNIOSSLTransport transport = new StompNIOSSLTransport(format, socket);
+ MQTTNIOSSLTransport transport = new MQTTNIOSSLTransport(format, socket);
if (context != null) {
transport.setSslContext(context);
}
@@ -51,12 +51,12 @@ public class StompNIOSSLTransportFactory
@Override
protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
- return new StompNIOSSLTransport(wf, socketFactory, location, localLocation);
+ return new MQTTNIOSSLTransport(wf, socketFactory, location, localLocation);
}
@Override
public TransportServer doBind(URI location) throws IOException {
- if (SslContext.getCurrentSslContext() != null) {
+ if (SslContext.getCurrentSslContext() != null) {
try {
context = SslContext.getCurrentSslContext().getSSLContext();
} catch (Exception e) {
Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransport.java (from r1310205, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransport.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransport.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java&r1=1310205&r2=1324714&rev=1324714&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransport.java Wed Apr 11 11:52:06 2012
@@ -14,9 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.transport.stomp;
+package org.apache.activemq.transport.mqtt;
-import java.io.ByteArrayInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
@@ -28,8 +27,6 @@ import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import javax.net.SocketFactory;
-
-import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.nio.NIOOutputStream;
import org.apache.activemq.transport.nio.SelectorManager;
import org.apache.activemq.transport.nio.SelectorSelection;
@@ -37,41 +34,41 @@ import org.apache.activemq.transport.tcp
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.wireformat.WireFormat;
+import org.fusesource.hawtbuf.DataByteArrayInputStream;
/**
- * An implementation of the {@link Transport} interface for using Stomp over NIO
- *
- *
+ * An implementation of the {@link org.apache.activemq.transport.Transport} interface for using MQTT over NIO
*/
-public class StompNIOTransport extends TcpTransport {
+public class MQTTNIOTransport extends TcpTransport {
private SocketChannel channel;
private SelectorSelection selection;
private ByteBuffer inputBuffer;
- StompCodec codec;
+ MQTTCodec codec;
- public StompNIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
+ public MQTTNIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
super(wireFormat, socketFactory, remoteLocation, localLocation);
}
- public StompNIOTransport(WireFormat wireFormat, Socket socket) throws IOException {
+ public MQTTNIOTransport(WireFormat wireFormat, Socket socket) throws IOException {
super(wireFormat, socket);
}
protected void initializeStreams() throws IOException {
channel = socket.getChannel();
channel.configureBlocking(false);
-
// listen for events telling us when the socket is readable.
selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() {
public void onSelect(SelectorSelection selection) {
- serviceRead();
+ if (!isStopped()) {
+ serviceRead();
+ }
}
public void onError(SelectorSelection selection, Throwable error) {
if (error instanceof IOException) {
- onException((IOException)error);
+ onException((IOException) error);
} else {
onException(IOExceptionSupport.create(error));
}
@@ -82,35 +79,34 @@ public class StompNIOTransport extends T
NIOOutputStream outPutStream = new NIOOutputStream(channel, 8 * 1024);
this.dataOut = new DataOutputStream(outPutStream);
this.buffOut = outPutStream;
- codec = new StompCodec(this);
+ codec = new MQTTCodec(this);
}
private void serviceRead() {
try {
- while (true) {
- // read channel
- int readSize = channel.read(inputBuffer);
- // channel is closed, cleanup
- if (readSize == -1) {
- onException(new EOFException());
- selection.close();
- break;
- }
- // nothing more to read, break
- if (readSize == 0) {
- break;
- }
-
- inputBuffer.flip();
+ while (isStarted()) {
+ // read channel
+ int readSize = channel.read(inputBuffer);
+ // channel is closed, cleanup
+ if (readSize == -1) {
+ onException(new EOFException());
+ selection.close();
+ break;
+ }
+ // nothing more to read, break
+ if (readSize == 0) {
+ break;
+ }
- ByteArrayInputStream input = new ByteArrayInputStream(inputBuffer.array());
- codec.parse(input, readSize);
+ inputBuffer.flip();
+ DataByteArrayInputStream dis = new DataByteArrayInputStream(inputBuffer.array());
+ codec.parse(dis, readSize);
- // clear the buffer
- inputBuffer.clear();
+ // clear the buffer
+ inputBuffer.clear();
- }
+ }
} catch (IOException e) {
onException(e);
} catch (Throwable e) {
@@ -133,4 +129,4 @@ public class StompNIOTransport extends T
super.doStop(stopper);
}
}
-}
+}
\ No newline at end of file
Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java (from r1310205, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransportFactory.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransportFactory.java&r1=1310205&r2=1324714&rev=1324714&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java Wed Apr 11 11:52:06 2012
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.transport.stomp;
+package org.apache.activemq.transport.mqtt;
import java.io.IOException;
import java.net.Socket;
@@ -26,7 +26,6 @@ import java.util.Map;
import javax.net.ServerSocketFactory;
import javax.net.SocketFactory;
-
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
@@ -39,28 +38,26 @@ import org.apache.activemq.util.Introspe
import org.apache.activemq.wireformat.WireFormat;
/**
- * A <a href="http://stomp.codehaus.org/">STOMP</a> over NIO transport factory
- *
- *
+ * A <a href="http://mqtt.org/">MQTT</a> over NIO transport factory
*/
-public class StompNIOTransportFactory extends NIOTransportFactory implements BrokerServiceAware {
+public class MQTTNIOTransportFactory extends NIOTransportFactory implements BrokerServiceAware {
private BrokerContext brokerContext = null;
protected String getDefaultWireFormatType() {
- return "stomp";
+ return "mqtt";
}
protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
return new TcpTransportServer(this, location, serverSocketFactory) {
protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
- return new StompNIOTransport(format, socket);
+ return new MQTTNIOTransport(format, socket);
}
};
}
protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
- return new StompNIOTransport(wf, socketFactory, location, localLocation);
+ return new MQTTNIOTransport(wf, socketFactory, location, localLocation);
}
@SuppressWarnings("rawtypes")
@@ -78,7 +75,7 @@ public class StompNIOTransportFactory ex
@SuppressWarnings("rawtypes")
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
- transport = new StompTransportFilter(transport, format, brokerContext);
+ transport = new MQTTTransportFilter(transport, format, brokerContext);
IntrospectionSupport.setProperties(transport, options);
return super.compositeConfigure(transport, format, options);
}
@@ -87,5 +84,12 @@ public class StompNIOTransportFactory ex
this.brokerContext = brokerService.getBrokerContext();
}
+ protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
+ MQTTInactivityMonitor monitor = new MQTTInactivityMonitor(transport, format);
+ MQTTTransportFilter filter = transport.narrow(MQTTTransportFilter.class);
+ filter.setInactivityMonitor(monitor);
+ return monitor;
+ }
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java?rev=1324714&r1=1324713&r2=1324714&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java Wed Apr 11 11:52:06 2012
@@ -28,7 +28,6 @@ import javax.jms.JMSException;
import javax.jms.Message;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.command.*;
-import org.apache.activemq.transport.stomp.ProtocolException;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
@@ -56,16 +55,14 @@ class MQTTProtocolConverter {
private final ProducerId producerId = new ProducerId(sessionId, 1);
private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
- private final LongSequenceGenerator tempDestinationGenerator = new LongSequenceGenerator();
private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
private final ConcurrentHashMap<ConsumerId, MQTTSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, MQTTSubscription>();
private final ConcurrentHashMap<UTF8Buffer, MQTTSubscription> mqttSubscriptionByTopic = new ConcurrentHashMap<UTF8Buffer, MQTTSubscription>();
- private final ConcurrentHashMap<String, ActiveMQDestination> tempDestinations = new ConcurrentHashMap<String, ActiveMQDestination>();
- private final ConcurrentHashMap<String, String> tempDestinationAmqToStompMap = new ConcurrentHashMap<String, String>();
private final Map<UTF8Buffer, ActiveMQTopic> activeMQTopicMap = new LRUCache<UTF8Buffer, ActiveMQTopic>();
private final Map<Destination, UTF8Buffer> mqttTopicMap = new LRUCache<Destination, UTF8Buffer>();
private final Map<Short, MessageAck> consumerAcks = new LRUCache<Short, MessageAck>();
+ private final Map<Short, PUBREC> publisherRecs = new LRUCache<Short, PUBREC>();
private final MQTTTransport mqttTransport;
private final Object commnadIdMutex = new Object();
@@ -143,6 +140,18 @@ class MQTTProtocolConverter {
onMQTTPubAck(new PUBACK().decode(frame));
break;
}
+ case PUBREC.TYPE: {
+ onMQTTPubRec(new PUBREC().decode(frame));
+ break;
+ }
+ case PUBREL.TYPE: {
+ onMQTTPubRel(new PUBREL().decode(frame));
+ break;
+ }
+ case PUBCOMP.TYPE: {
+ onMQTTPubComp(new PUBCOMP().decode(frame));
+ break;
+ }
default:
handleException(new MQTTProtocolException("Unknown MQTTFrame type: " + frame.messageType(), true), frame);
}
@@ -150,10 +159,10 @@ class MQTTProtocolConverter {
}
- void onMQTTConnect(final CONNECT connect) throws ProtocolException {
+ void onMQTTConnect(final CONNECT connect) throws MQTTProtocolException {
if (connected.get()) {
- throw new ProtocolException("All ready connected.");
+ throw new MQTTProtocolException("All ready connected.");
}
this.connect = connect;
@@ -268,7 +277,7 @@ class MQTTProtocolConverter {
consumerInfo.setSubscriptionName(connect.clientId().toString());
}
- MQTTSubscription mqttSubscription = new MQTTSubscription(this, command.qos(), consumerInfo);
+ MQTTSubscription mqttSubscription = new MQTTSubscription(this, topic.qos(), consumerInfo);
subscriptionsByConsumerId.put(id, mqttSubscription);
@@ -327,13 +336,16 @@ class MQTTProtocolConverter {
MQTTSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId());
if (sub != null) {
MessageAck ack = sub.createMessageAck(md);
- PUBLISH publish = convertMessage((ActiveMQMessage) md.getMessage());
- if (ack != null) {
+ PUBLISH publish = sub.createPublish((ActiveMQMessage) md.getMessage());
+ if (ack != null && sub.expectAck()) {
synchronized (consumerAcks) {
consumerAcks.put(publish.messageId(), ack);
}
}
getMQTTTransport().sendToMQTT(publish.encode());
+ if (ack != null && !sub.expectAck()) {
+ getMQTTTransport().sendToActiveMQ(ack);
+ }
}
} else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
// Pass down any unexpected async errors. Should this close the connection?
@@ -356,7 +368,38 @@ class MQTTProtocolConverter {
void onMQTTPubAck(PUBACK command) {
short messageId = command.messageId();
- MessageAck ack = null;
+ MessageAck ack;
+ synchronized (consumerAcks) {
+ ack = consumerAcks.remove(messageId);
+ }
+ if (ack != null) {
+ getMQTTTransport().sendToActiveMQ(ack);
+ }
+ }
+
+ void onMQTTPubRec(PUBREC commnand) {
+ //from a subscriber - send a PUBREL in response
+ PUBREL pubrel = new PUBREL();
+ pubrel.messageId(commnand.messageId());
+ sendToMQTT(pubrel.encode());
+ }
+
+ void onMQTTPubRel(PUBREL command) {
+ PUBREC ack;
+ synchronized (publisherRecs) {
+ ack = publisherRecs.remove(command.messageId());
+ }
+ if (ack == null) {
+ LOG.warn("Unknown PUBREL: " + command.messageId() + " received");
+ }
+ PUBCOMP pubcomp = new PUBCOMP();
+ pubcomp.messageId(command.messageId());
+ sendToMQTT(pubcomp.encode());
+ }
+
+ void onMQTTPubComp(PUBCOMP command) {
+ short messageId = command.messageId();
+ MessageAck ack;
synchronized (consumerAcks) {
ack = consumerAcks.remove(messageId);
}
@@ -461,19 +504,24 @@ class MQTTProtocolConverter {
return mqttTransport;
}
- public ActiveMQDestination createTempDestination(String name, boolean topic) {
- ActiveMQDestination rc = tempDestinations.get(name);
- if (rc == null) {
- if (topic) {
- rc = new ActiveMQTempTopic(connectionId, tempDestinationGenerator.getNextSequenceId());
- } else {
- rc = new ActiveMQTempQueue(connectionId, tempDestinationGenerator.getNextSequenceId());
+ public void onTransportError() {
+ if (connect != null) {
+ if (connect.willTopic() != null && connect.willMessage() != null) {
+ try {
+ PUBLISH publish = new PUBLISH();
+ publish.topicName(connect.willTopic());
+ publish.qos(connect.willQos());
+ publish.payload(connect.willMessage());
+ ActiveMQMessage message = convertMessage(publish);
+ message.setProducerId(producerId);
+ message.onSend();
+ sendToActiveMQ(message, null);
+ } catch (Exception e) {
+ LOG.warn("Failed to publish Will Message " + connect.willMessage());
+ }
+
}
- sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null);
- tempDestinations.put(name, rc);
- tempDestinationAmqToStompMap.put(rc.getQualifiedName(), name);
}
- return rc;
}
@@ -482,7 +530,7 @@ class MQTTProtocolConverter {
int heartBeatMS = heartBeat * 1000;
MQTTInactivityMonitor monitor = getMQTTTransport().getInactivityMonitor();
-
+ monitor.setProtocolConverter(this);
monitor.setReadCheckTime(heartBeatMS);
monitor.setInitialDelayTime(heartBeatMS);
monitor.startMonitorThread();
@@ -555,8 +603,11 @@ class MQTTProtocolConverter {
if (response.isException()) {
LOG.warn("Failed to send MQTT Publish: ", command, ((ExceptionResponse) response).getException());
} else {
- PUBACK ack = new PUBACK();
+ PUBREC ack = new PUBREC();
ack.messageId(command.messageId());
+ synchronized (publisherRecs) {
+ publisherRecs.put(command.messageId(), ack);
+ }
converter.getMQTTTransport().sendToMQTT(ack.encode());
}
}
@@ -565,27 +616,6 @@ class MQTTProtocolConverter {
break;
}
}
- /*
- final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
- if (receiptId != null) {
- return new ResponseHandler() {
- public void onResponse(ProtocolConverter converter, Response response) throws IOException {
- if (response.isException()) {
- // Generally a command can fail.. but that does not invalidate the connection.
- // We report back the failure but we don't close the connection.
- Throwable exception = ((ExceptionResponse)response).getException();
- handleException(exception, command);
- } else {
- StompFrame sc = new StompFrame();
- sc.setAction(Stomp.Responses.RECEIPT);
- sc.setHeaders(new HashMap<String, String>(1));
- sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
- stompTransport.sendToStomp(sc);
- }
- }
- };
- }
- */
return null;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java?rev=1324714&r1=1324713&r2=1324714&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java Wed Apr 11 11:52:06 2012
@@ -16,11 +16,17 @@
*/
package org.apache.activemq.transport.mqtt;
+import java.io.IOException;
+import java.util.zip.DataFormatException;
+
+import javax.jms.JMSException;
import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.codec.PUBLISH;
/**
* Keeps track of the MQTT client subscription so that acking is correctly done.
@@ -39,16 +45,20 @@ class MQTTSubscription {
}
MessageAck createMessageAck(MessageDispatch md) {
+ return new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
+ }
- switch (qos) {
- case AT_MOST_ONCE: {
- return null;
- }
-
+ PUBLISH createPublish(ActiveMQMessage message) throws DataFormatException, IOException, JMSException {
+ PUBLISH publish = protocolConverter.convertMessage(message);
+ if (publish.qos().ordinal() > this.qos.ordinal()) {
+ publish.qos(this.qos);
}
- return new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
+ return publish;
}
+ public boolean expectAck() {
+ return qos != QoS.AT_MOST_ONCE;
+ }
public void setDestination(ActiveMQDestination destination) {
this.destination = destination;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java?rev=1324714&r1=1324713&r2=1324714&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java Wed Apr 11 11:52:06 2012
@@ -33,8 +33,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * The StompTransportFilter normally sits on top of a TcpTransport that has been
- * configured with the StompWireFormat and is used to convert STOMP commands to
+ * The MQTTTransportFilter normally sits on top of a TcpTransport that has been
+ * configured with the StompWireFormat and is used to convert MQTT commands to
* ActiveMQ commands. All of the conversion work is done by delegating to the
* MQTTProtocolConverter
*/
@@ -73,7 +73,7 @@ public class MQTTTransportFilter extends
protocolConverter.onMQTTCommand((MQTTFrame) command);
} catch (IOException e) {
- onException(e);
+ handleException(e);
} catch (JMSException e) {
onException(IOExceptionSupport.create(e));
}
@@ -129,5 +129,10 @@ public class MQTTTransportFilter extends
return this.wireFormat;
}
+ public void handleException(IOException e) {
+ protocolConverter.onTransportError();
+ super.onException(e);
+ }
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java?rev=1324714&r1=1324713&r2=1324714&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java Wed Apr 11 11:52:06 2012
@@ -36,7 +36,7 @@ import org.fusesource.mqtt.codec.MQTTFra
public class MQTTWireFormat implements WireFormat {
- private static final int MAX_MESSAGE_LENGTH = 1024 * 1024 * 256;
+ static final int MAX_MESSAGE_LENGTH = 1024 * 1024 * 256;
private boolean encodingEnabled = false;
private int version = 1;
@@ -79,8 +79,7 @@ public class MQTTWireFormat implements W
public Object unmarshal(DataInput dataIn) throws IOException {
byte header = dataIn.readByte();
- byte digit = 0;
-
+ byte digit;
int multiplier = 1;
int length = 0;
do {
@@ -89,6 +88,7 @@ public class MQTTWireFormat implements W
multiplier <<= 7;
}
while ((digit & 0x80) != 0);
+
if (length >= 0) {
if (length > MAX_MESSAGE_LENGTH) {
throw new IOException("The maximum message length was exceeded");
Copied: activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt+nio (from r1310205, activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp+nio)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt%2Bnio?p2=activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt%2Bnio&p1=activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp%2Bnio&r1=1310205&r2=1324714&rev=1324714&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp+nio (original)
+++ activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt+nio Wed Apr 11 11:52:06 2012
@@ -14,4 +14,4 @@
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
-class=org.apache.activemq.transport.stomp.StompNIOTransportFactory
+class=org.apache.activemq.transport.mqtt.MQTTNIOTransportFactory
Copied: activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt+nio+ssl (from r1310205, activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp+nio+ssl)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt%2Bnio%2Bssl?p2=activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt%2Bnio%2Bssl&p1=activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp%2Bnio%2Bssl&r1=1310205&r2=1324714&rev=1324714&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp+nio+ssl (original)
+++ activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt+nio+ssl Wed Apr 11 11:52:06 2012
@@ -14,4 +14,4 @@
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
-class=org.apache.activemq.transport.stomp.StompNIOSSLTransportFactory
\ No newline at end of file
+class=org.apache.activemq.transport.mqtt.MQTTNIOSSLTransportFactory
\ No newline at end of file
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java?rev=1324714&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java Wed Apr 11 11:52:06 2012
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mqtt;
+
+import java.security.SecureRandom;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+import org.apache.activemq.broker.BrokerService;
+import org.fusesource.mqtt.client.MQTT;
+
+public class MQTTSSLTest extends MQTTTest {
+ public void startBroker() throws Exception {
+ System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
+ System.setProperty("javax.net.ssl.trustStorePassword", "password");
+ System.setProperty("javax.net.ssl.trustStoreType", "jks");
+ System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore");
+ System.setProperty("javax.net.ssl.keyStorePassword", "password");
+ System.setProperty("javax.net.ssl.keyStoreType", "jks");
+ super.startBroker();
+ }
+
+ protected void addMQTTConnector(BrokerService brokerService) throws Exception {
+ brokerService.addConnector("mqtt+ssl://localhost:8883");
+ }
+
+ protected MQTT createMQTTConnection() throws Exception {
+ MQTT mqtt = new MQTT();
+ mqtt.setHost("ssl://localhost:8883");
+ SSLContext ctx = SSLContext.getInstance("TLS");
+ ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom());
+ mqtt.setSslContext(ctx);
+ return mqtt;
+ }
+
+ static class DefaultTrustManager implements X509TrustManager {
+
+ public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
+ }
+
+ public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
+ }
+
+ public X509Certificate[] getAcceptedIssuers() {
+ return new X509Certificate[0];
+ }
+ }
+
+}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java?rev=1324714&r1=1324713&r2=1324714&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java Wed Apr 11 11:52:06 2012
@@ -36,13 +36,15 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.fusesource.hawtbuf.UTF8Buffer.utf8;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
public class MQTTTest {
- private static final Logger LOG = LoggerFactory.getLogger(MQTTConnectTest.class);
- BrokerService brokerService;
- Vector<Throwable> exceptions = new Vector<Throwable>();
+ protected static final Logger LOG = LoggerFactory.getLogger(MQTTTest.class);
+ protected BrokerService brokerService;
+ protected Vector<Throwable> exceptions = new Vector<Throwable>();
+ protected int numberOfMessages;
@Before
public void startBroker() throws Exception {
@@ -50,6 +52,7 @@ public class MQTTTest {
brokerService = new BrokerService();
brokerService.setPersistent(false);
brokerService.setAdvisorySupport(false);
+ this.numberOfMessages = 2000;
}
@After
@@ -60,19 +63,40 @@ public class MQTTTest {
}
@Test
- public void testSendAndReceiveAtLeastOnce() throws Exception {
+ public void testSendAndReceiveAtMostOnce() throws Exception {
+ addMQTTConnector(brokerService);
+ brokerService.start();
+ MQTT mqtt = createMQTTConnection();
+ mqtt.setKeepAlive(Short.MAX_VALUE);
+ BlockingConnection connection = mqtt.blockingConnection();
- brokerService.addConnector("mqtt://localhost:1883");
+ connection.connect();
+
+
+ Topic[] topics = {new Topic(utf8("foo"), QoS.AT_MOST_ONCE)};
+ connection.subscribe(topics);
+ for (int i = 0; i < numberOfMessages; i++) {
+ String payload = "Test Message: " + i;
+ connection.publish("foo", payload.getBytes(), QoS.AT_MOST_ONCE, false);
+ Message message = connection.receive();
+ assertEquals(payload, new String(message.getPayload()));
+ }
+ connection.disconnect();
+ }
+
+ @Test
+ public void testSendAndReceiveAtLeastOnce() throws Exception {
+ addMQTTConnector(brokerService);
brokerService.start();
- MQTT mqtt = new MQTT();
- mqtt.setHost("localhost", 1883);
+ MQTT mqtt = createMQTTConnection();
+ mqtt.setKeepAlive(Short.MAX_VALUE);
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
Topic[] topics = {new Topic(utf8("foo"), QoS.AT_LEAST_ONCE)};
connection.subscribe(topics);
- for (int i = 0; i < 10000; i++) {
+ for (int i = 0; i < numberOfMessages; i++) {
String payload = "Test Message: " + i;
connection.publish("foo", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
Message message = connection.receive();
@@ -83,13 +107,70 @@ public class MQTTTest {
}
@Test
- public void testSendMQTTReceiveJMS() throws Exception {
+ public void testSendAndReceiveExactlyOnce() throws Exception {
+ addMQTTConnector(brokerService);
+ brokerService.start();
+ MQTT publisher = createMQTTConnection();
+ BlockingConnection pubConnection = publisher.blockingConnection();
- brokerService.addConnector("mqtt://localhost:1883");
+ pubConnection.connect();
+
+ MQTT subscriber = createMQTTConnection();
+ BlockingConnection subConnection = subscriber.blockingConnection();
+
+ subConnection.connect();
+
+ Topic[] topics = {new Topic(utf8("foo"), QoS.EXACTLY_ONCE)};
+ subConnection.subscribe(topics);
+ for (int i = 0; i < numberOfMessages; i++) {
+ String payload = "Test Message: " + i;
+ pubConnection.publish("foo", payload.getBytes(), QoS.EXACTLY_ONCE, false);
+ Message message = subConnection.receive();
+ message.ack();
+ assertEquals(payload, new String(message.getPayload()));
+ }
+ subConnection.disconnect();
+ pubConnection.disconnect();
+ }
+
+ @Test
+ public void testSendAndReceiveLargeMessages() throws Exception {
+ byte[] payload = new byte[1024 * 32];
+ for (int i = 0; i < payload.length; i++){
+ payload[i] = '2';
+ }
+ addMQTTConnector(brokerService);
+ brokerService.start();
+
+ MQTT publisher = createMQTTConnection();
+ BlockingConnection pubConnection = publisher.blockingConnection();
+
+ pubConnection.connect();
+
+ MQTT subscriber = createMQTTConnection();
+ BlockingConnection subConnection = subscriber.blockingConnection();
+
+ subConnection.connect();
+
+ Topic[] topics = {new Topic(utf8("foo"), QoS.AT_LEAST_ONCE)};
+ subConnection.subscribe(topics);
+ for (int i = 0; i < 10; i++) {
+ pubConnection.publish("foo", payload, QoS.AT_LEAST_ONCE, false);
+ Message message = subConnection.receive();
+ message.ack();
+ assertArrayEquals(payload, message.getPayload());
+ }
+ subConnection.disconnect();
+ pubConnection.disconnect();
+ }
+
+
+ @Test
+ public void testSendMQTTReceiveJMS() throws Exception {
+ addMQTTConnector(brokerService);
brokerService.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);
brokerService.start();
- MQTT mqtt = new MQTT();
- mqtt.setHost("localhost", 1883);
+ MQTT mqtt = createMQTTConnection();
BlockingConnection connection = mqtt.blockingConnection();
final String DESTINATION_NAME = "foo";
connection.connect();
@@ -100,7 +181,7 @@ public class MQTTTest {
javax.jms.Topic jmsTopic = s.createTopic(DESTINATION_NAME);
MessageConsumer consumer = s.createConsumer(jmsTopic);
- for (int i = 0; i < 10000; i++) {
+ for (int i = 0; i < numberOfMessages; i++) {
String payload = "Test Message: " + i;
connection.publish("foo", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
ActiveMQMessage message = (ActiveMQMessage) consumer.receive();
@@ -113,5 +194,15 @@ public class MQTTTest {
connection.disconnect();
}
+ protected void addMQTTConnector(BrokerService brokerService) throws Exception {
+ brokerService.addConnector("mqtt://localhost:1883");
+ }
+
+ protected MQTT createMQTTConnection() throws Exception {
+ MQTT mqtt = new MQTT();
+ mqtt.setHost("localhost", 1883);
+ return mqtt;
+ }
+
}
\ No newline at end of file
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MTQQNioTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MTQQNioTest.java?rev=1324714&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MTQQNioTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MTQQNioTest.java Wed Apr 11 11:52:06 2012
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mqtt;
+
+import org.apache.activemq.broker.BrokerService;
+
+public class MTQQNioTest extends MQTTTest {
+ protected void addMQTTConnector(BrokerService brokerService) throws Exception {
+ brokerService.addConnector("mqtt+nio://localhost:1883?maxInactivityDuration=-1");
+ }
+
+}