You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/08/05 18:43:57 UTC
git commit: https://issues.apache.org/jira/browse/AMQ-5308
Repository: activemq
Updated Branches:
refs/heads/trunk c99e2d837 -> 052d29314
https://issues.apache.org/jira/browse/AMQ-5308
Improve performance of the codec for large message processing.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/052d2931
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/052d2931
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/052d2931
Branch: refs/heads/trunk
Commit: 052d2931434fae255450887e9be288362e17b6fc
Parents: c99e2d8
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Aug 5 12:43:36 2014 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Aug 5 12:43:36 2014 -0400
----------------------------------------------------------------------
.../activemq/transport/mqtt/MQTTCodec.java | 112 +++++++++++-------
.../activemq/transport/mqtt/MQTTCodecTest.java | 114 +++++++++++++++++++
2 files changed, 185 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/052d2931/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTCodec.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTCodec.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTCodec.java
index c892dd1..6970af7 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTCodec.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTCodec.java
@@ -19,31 +19,27 @@ package org.apache.activemq.transport.mqtt;
import java.io.IOException;
import org.apache.activemq.transport.tcp.TcpTransport;
+import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.DataByteArrayInputStream;
-import org.fusesource.hawtbuf.DataByteArrayOutputStream;
import org.fusesource.mqtt.codec.MQTTFrame;
public class MQTTCodec {
private final MQTTFrameSink frameSink;
- private final DataByteArrayOutputStream currentCommand = new DataByteArrayOutputStream();
- private byte header;
+ private byte header;
private int contentLength = -1;
- private int payLoadRead = 0;
-
- public interface MQTTFrameSink {
- void onFrame(MQTTFrame mqttFrame);
- }
private FrameParser currentParser;
- // Internal parsers implement this and we switch to the next as we go.
- private interface FrameParser {
-
- void parse(DataByteArrayInputStream data, int readSize) throws IOException;
+ private final Buffer scratch = new Buffer(8 * 1024);
+ private Buffer currentBuffer;
- void reset() throws IOException;
+ /**
+ * Sink for newly decoded MQTT Frames.
+ */
+ public interface MQTTFrameSink {
+ void onFrame(MQTTFrame mqttFrame);
}
public MQTTCodec(MQTTFrameSink sink) {
@@ -70,7 +66,16 @@ public class MQTTCodec {
}
private void processCommand() throws IOException {
- MQTTFrame frame = new MQTTFrame(currentCommand.toBuffer().deepCopy()).header(header);
+
+ Buffer frameContents = null;
+ if (currentBuffer == scratch) {
+ frameContents = scratch.deepCopy();
+ } else {
+ frameContents = currentBuffer;
+ currentBuffer = null;
+ }
+
+ MQTTFrame frame = new MQTTFrame(frameContents).header(header);
frameSink.onFrame(frame);
}
@@ -93,6 +98,13 @@ public class MQTTCodec {
//----- Frame parser implementations -------------------------------------//
+ private interface FrameParser {
+
+ void parse(DataByteArrayInputStream data, int readSize) throws IOException;
+
+ void reset() throws IOException;
+ }
+
private final FrameParser headerParser = new FrameParser() {
@Override
@@ -108,7 +120,9 @@ public class MQTTCodec {
header = b;
currentParser = initializeVariableLengthParser();
- currentParser.parse(data, readSize - 1);
+ if (readSize > 1) {
+ currentParser.parse(data, readSize - 1);
+ }
return;
}
}
@@ -116,32 +130,7 @@ public class MQTTCodec {
@Override
public void reset() throws IOException {
header = -1;
- }
- };
-
- private final FrameParser contentParser = new FrameParser() {
-
- @Override
- public void parse(DataByteArrayInputStream data, int readSize) throws IOException {
- int i = 0;
- while (i++ < readSize) {
- currentCommand.write(data.readByte());
- payLoadRead++;
-
- if (payLoadRead == contentLength) {
- processCommand();
- currentParser = initializeHeaderParser();
- currentParser.parse(data, readSize - i);
- return;
- }
- }
- }
-
- @Override
- public void reset() throws IOException {
contentLength = -1;
- payLoadRead = 0;
- currentCommand.reset();
}
};
@@ -166,7 +155,11 @@ public class MQTTCodec {
currentParser = initializeContentParser();
contentLength = length;
}
- currentParser.parse(data, readSize - i);
+
+ readSize = readSize - i;
+ if (readSize > 0) {
+ currentParser.parse(data, readSize);
+ }
return;
}
}
@@ -179,4 +172,41 @@ public class MQTTCodec {
length = 0;
}
};
+
+ private final FrameParser contentParser = new FrameParser() {
+
+ private int payLoadRead = 0;
+
+ @Override
+ public void parse(DataByteArrayInputStream data, int readSize) throws IOException {
+ if (currentBuffer == null) {
+ if (contentLength < scratch.length()) {
+ currentBuffer = scratch;
+ currentBuffer.length = contentLength;
+ } else {
+ currentBuffer = new Buffer(contentLength);
+ }
+ }
+
+ int length = Math.min(readSize, contentLength - payLoadRead);
+ payLoadRead += data.read(currentBuffer.data, payLoadRead, length);
+
+ if (payLoadRead == contentLength) {
+ processCommand();
+ currentParser = initializeHeaderParser();
+ readSize = readSize - payLoadRead;
+ if (readSize > 0) {
+ currentParser.parse(data, readSize);
+ }
+ }
+ }
+
+ @Override
+ public void reset() throws IOException {
+ contentLength = -1;
+ payLoadRead = 0;
+ scratch.reset();
+ currentBuffer = null;
+ }
+ };
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/052d2931/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTCodecTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTCodecTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTCodecTest.java
index 31af1ab..a1b087b 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTCodecTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTCodecTest.java
@@ -22,13 +22,18 @@ import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.DataByteArrayInputStream;
import org.fusesource.hawtbuf.DataByteArrayOutputStream;
import org.fusesource.hawtbuf.UTF8Buffer;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.client.Topic;
import org.fusesource.mqtt.codec.CONNECT;
import org.fusesource.mqtt.codec.MQTTFrame;
+import org.fusesource.mqtt.codec.PUBLISH;
+import org.fusesource.mqtt.codec.SUBSCRIBE;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
@@ -46,6 +51,9 @@ public class MQTTCodecTest {
private List<MQTTFrame> frames;
private MQTTCodec codec;
+ private final int MESSAGE_SIZE = 5 * 1024 * 1024;
+ private final int ITERATIONS = 1000;
+
@Before
public void setUp() throws Exception {
frames = new ArrayList<MQTTFrame>();
@@ -81,6 +89,45 @@ public class MQTTCodecTest {
}
@Test
+ public void testConnectThenSubscribe() throws Exception {
+
+ CONNECT connect = new CONNECT();
+ connect.cleanSession(true);
+ connect.clientId(new UTF8Buffer(""));
+
+ DataByteArrayOutputStream output = new DataByteArrayOutputStream();
+ wireFormat.marshal(connect.encode(), output);
+ Buffer marshalled = output.toBuffer();
+
+ DataByteArrayInputStream input = new DataByteArrayInputStream(marshalled);
+ codec.parse(input, marshalled.length());
+
+ assertTrue(!frames.isEmpty());
+ assertEquals(1, frames.size());
+
+ connect = new CONNECT().decode(frames.get(0));
+ LOG.info("Unmarshalled: {}", connect);
+ assertTrue(connect.cleanSession());
+
+ frames.clear();
+
+ SUBSCRIBE subscribe = new SUBSCRIBE();
+ subscribe.topics(new Topic[] {new Topic("TEST", QoS.EXACTLY_ONCE) });
+
+ output = new DataByteArrayOutputStream();
+ wireFormat.marshal(subscribe.encode(), output);
+ marshalled = output.toBuffer();
+
+ input = new DataByteArrayInputStream(marshalled);
+ codec.parse(input, marshalled.length());
+
+ assertTrue(!frames.isEmpty());
+ assertEquals(1, frames.size());
+
+ subscribe = new SUBSCRIBE().decode(frames.get(0));
+ }
+
+ @Test
public void testConnectWithCredentialsBackToBack() throws Exception {
CONNECT connect = new CONNECT();
@@ -175,4 +222,71 @@ public class MQTTCodecTest {
assertEquals("pass", connect.password().toString());
assertEquals("test", connect.clientId().toString());
}
+
+ @Test
+ public void testMessageDecoding() throws Exception {
+
+ byte[] CONTENTS = new byte[MESSAGE_SIZE];
+ for (int i = 0; i < MESSAGE_SIZE; i++) {
+ CONTENTS[i] = 'a';
+ }
+
+ PUBLISH publish = new PUBLISH();
+
+ publish.dup(false);
+ publish.messageId((short) 127);
+ publish.qos(QoS.AT_LEAST_ONCE);
+ publish.payload(new Buffer(CONTENTS));
+ publish.topicName(new UTF8Buffer("TOPIC"));
+
+ DataByteArrayOutputStream output = new DataByteArrayOutputStream();
+ wireFormat.marshal(publish.encode(), output);
+ Buffer marshalled = output.toBuffer();
+
+ DataByteArrayInputStream input = new DataByteArrayInputStream(marshalled);
+ codec.parse(input, marshalled.length());
+
+ assertTrue(!frames.isEmpty());
+ assertEquals(1, frames.size());
+
+ publish = new PUBLISH().decode(frames.get(0));
+ assertFalse(publish.dup());
+ assertEquals(MESSAGE_SIZE, publish.payload().length());
+ }
+
+ @Test
+ public void testMessageDecodingPerformance() throws Exception {
+
+ byte[] CONTENTS = new byte[MESSAGE_SIZE];
+ for (int i = 0; i < MESSAGE_SIZE; i++) {
+ CONTENTS[i] = 'a';
+ }
+
+ PUBLISH publish = new PUBLISH();
+
+ publish.dup(false);
+ publish.messageId((short) 127);
+ publish.qos(QoS.AT_LEAST_ONCE);
+ publish.payload(new Buffer(CONTENTS));
+ publish.topicName(new UTF8Buffer("TOPIC"));
+
+ DataByteArrayOutputStream output = new DataByteArrayOutputStream();
+ wireFormat.marshal(publish.encode(), output);
+ Buffer marshalled = output.toBuffer();
+
+ long startTime = System.currentTimeMillis();
+
+ for (int i = 0; i < ITERATIONS; ++i) {
+ DataByteArrayInputStream input = new DataByteArrayInputStream(marshalled);
+ codec.parse(input, marshalled.length());
+
+ assertTrue(!frames.isEmpty());
+ publish = new PUBLISH().decode(frames.get(0));
+ frames.clear();
+ }
+
+ long duration = System.currentTimeMillis() - startTime;
+
+ LOG.info("Total time to process: {}", TimeUnit.MILLISECONDS.toSeconds(duration));
+ }
}