You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2015/09/14 14:54:17 UTC
[1/2] activemq-artemis git commit: ARTEMIS-200 Message Compression
Support
Repository: activemq-artemis
Updated Branches:
refs/heads/master 6408fd035 -> cbb6c63d0
ARTEMIS-200 Message Compression Support
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/0abf5246
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/0abf5246
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/0abf5246
Branch: refs/heads/master
Commit: 0abf52468ba22004fc51e4314275259f97caa4dd
Parents: 6408fd0
Author: Howard Gao <ho...@gmail.com>
Authored: Fri Sep 11 15:29:49 2015 +0800
Committer: Howard Gao <ho...@gmail.com>
Committed: Mon Sep 14 09:56:19 2015 +0800
----------------------------------------------------------------------
.../openwire/OpenWireMessageConverter.java | 177 +++++++++++--
.../command/MessageCompressionTest.java | 207 +++++++++++++++
.../openwire/interop/CompressedInteropTest.java | 263 +++++++++++++++++++
3 files changed, 631 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0abf5246/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 717ca8e..bda4c22 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -22,10 +22,17 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.Inflater;
+import java.util.zip.InflaterInputStream;
+import java.util.zip.InflaterOutputStream;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
@@ -55,6 +62,7 @@ import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.ByteSequenceData;
import org.apache.activemq.util.MarshallingSupport;
import org.apache.activemq.wireformat.WireFormat;
import org.fusesource.hawtbuf.UTF8Buffer;
@@ -85,8 +93,8 @@ public class OpenWireMessageConverter implements MessageConverter {
private static final String AMQ_MSG_TX_ID = AMQ_PREFIX + "TX_ID";
private static final String AMQ_MSG_USER_ID = AMQ_PREFIX + "USER_ID";
- private static final String AMQ_MSG_COMPRESSED = AMQ_PREFIX + "COMPRESSED";
private static final String AMQ_MSG_DROPPABLE = AMQ_PREFIX + "DROPPABLE";
+ private static final String AMQ_MSG_COMPRESSED = AMQ_PREFIX + "COMPRESSED";
@Override
public ServerMessage inbound(Object message) {
@@ -118,9 +126,17 @@ public class OpenWireMessageConverter implements MessageConverter {
ByteSequence contents = messageSend.getContent();
if (contents != null) {
ActiveMQBuffer body = coreMessage.getBodyBuffer();
+ boolean messageCompressed = messageSend.isCompressed();
+ if (messageCompressed) {
+ coreMessage.putBooleanProperty(AMQ_MSG_COMPRESSED, messageCompressed);
+ }
+
switch (coreType) {
case org.apache.activemq.artemis.api.core.Message.TEXT_TYPE:
- ByteArrayInputStream tis = new ByteArrayInputStream(contents);
+ InputStream tis = new ByteArrayInputStream(contents);
+ if (messageCompressed) {
+ tis = new InflaterInputStream(tis);
+ }
DataInputStream tdataIn = new DataInputStream(tis);
String text = MarshallingSupport.readUTF8(tdataIn);
tdataIn.close();
@@ -128,6 +144,9 @@ public class OpenWireMessageConverter implements MessageConverter {
break;
case org.apache.activemq.artemis.api.core.Message.MAP_TYPE:
InputStream mis = new ByteArrayInputStream(contents);
+ if (messageCompressed) {
+ mis = new InflaterInputStream(mis);
+ }
DataInputStream mdataIn = new DataInputStream(mis);
Map<String, Object> map = MarshallingSupport.unmarshalPrimitiveMap(mdataIn);
mdataIn.close();
@@ -136,11 +155,33 @@ public class OpenWireMessageConverter implements MessageConverter {
props.encode(body);
break;
case org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE:
+ if (messageCompressed) {
+ InputStream ois = new ByteArrayInputStream(contents);
+ ois = new InflaterInputStream(ois);
+ org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream();
+
+ try {
+ byte[] buf = new byte[1024];
+ int n = ois.read(buf);
+ while (n != -1) {
+ decompressed.write(buf, 0, n);
+ n = ois.read();
+ }
+ //read done
+ contents = decompressed.toByteSequence();
+ }
+ finally {
+ decompressed.close();
+ }
+ }
body.writeInt(contents.length);
body.writeBytes(contents.data, contents.offset, contents.length);
break;
case org.apache.activemq.artemis.api.core.Message.STREAM_TYPE:
InputStream sis = new ByteArrayInputStream(contents);
+ if (messageCompressed) {
+ sis = new InflaterInputStream(sis);
+ }
DataInputStream sdis = new DataInputStream(sis);
int stype = sdis.read();
while (stype != -1) {
@@ -210,7 +251,47 @@ public class OpenWireMessageConverter implements MessageConverter {
}
sdis.close();
break;
+ case org.apache.activemq.artemis.api.core.Message.BYTES_TYPE:
+ if (messageCompressed) {
+ Inflater inflater = new Inflater();
+ org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream();
+ try {
+ int length = ByteSequenceData.readIntBig(contents);
+ contents.offset = 0;
+ byte[] data = Arrays.copyOfRange(contents.getData(), 4, contents.getLength());
+
+ inflater.setInput(data);
+ byte[] buffer = new byte[length];
+ int count = inflater.inflate(buffer);
+ decompressed.write(buffer, 0, count);
+ contents = decompressed.toByteSequence();
+ }
+ catch (Exception e) {
+ throw new IOException(e);
+ }
+ finally {
+ inflater.end();
+ decompressed.close();
+ }
+ }
+ body.writeBytes(contents.data, contents.offset, contents.length);
+ break;
default:
+ if (messageCompressed) {
+ org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream();
+ OutputStream os = new InflaterOutputStream(decompressed);
+ try {
+ os.write(contents.data, contents.offset, contents.getLength());
+ contents = decompressed.toByteSequence();
+ }
+ catch (Exception e) {
+ throw new IOException(e);
+ }
+ finally {
+ os.close();
+ decompressed.close();
+ }
+ }
body.writeBytes(contents.data, contents.offset, contents.length);
break;
}
@@ -317,7 +398,6 @@ public class OpenWireMessageConverter implements MessageConverter {
if (userId != null) {
coreMessage.putStringProperty(AMQ_MSG_USER_ID, userId);
}
- coreMessage.putBooleanProperty(AMQ_MSG_COMPRESSED, messageSend.isCompressed());
coreMessage.putBooleanProperty(AMQ_MSG_DROPPABLE, messageSend.isDroppable());
}
@@ -357,7 +437,7 @@ public class OpenWireMessageConverter implements MessageConverter {
public static MessageDispatch createMessageDispatch(ServerMessage message,
int deliveryCount,
- AMQConsumer consumer) throws IOException {
+ AMQConsumer consumer) throws IOException, JMSException {
ActiveMQMessage amqMessage = toAMQMessage(message, consumer.getMarshaller(), consumer.getActualDestination());
MessageDispatch md = new MessageDispatch();
@@ -412,18 +492,25 @@ public class OpenWireMessageConverter implements MessageConverter {
amqMsg.setBrokerInTime(brokerInTime);
ActiveMQBuffer buffer = coreMessage.getBodyBufferCopy();
+ Boolean compressProp = (Boolean)coreMessage.getObjectProperty(AMQ_MSG_COMPRESSED);
+ boolean isCompressed = compressProp == null ? false : compressProp.booleanValue();
+ amqMsg.setCompressed(isCompressed);
+
if (buffer != null) {
buffer.resetReaderIndex();
byte[] bytes = null;
synchronized (buffer) {
if (coreType == org.apache.activemq.artemis.api.core.Message.TEXT_TYPE) {
SimpleString text = buffer.readNullableSimpleString();
-
if (text != null) {
- ByteArrayOutputStream out = new ByteArrayOutputStream(text.length() + 4);
+ ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(text.length() + 4);
+ OutputStream out = bytesOut;
+ if (isCompressed) {
+ out = new DeflaterOutputStream(out);
+ }
DataOutputStream dataOut = new DataOutputStream(out);
MarshallingSupport.writeUTF8(dataOut, text.toString());
- bytes = out.toByteArray();
+ bytes = bytesOut.toByteArray();
out.close();
}
}
@@ -433,18 +520,33 @@ public class OpenWireMessageConverter implements MessageConverter {
Map<String, Object> map = mapData.getMap();
ByteArrayOutputStream out = new ByteArrayOutputStream(mapData.getEncodeSize());
- DataOutputStream dataOut = new DataOutputStream(out);
+ OutputStream os = out;
+ if (isCompressed) {
+ os = new DeflaterOutputStream(os);
+ }
+ DataOutputStream dataOut = new DataOutputStream(os);
MarshallingSupport.marshalPrimitiveMap(map, dataOut);
- bytes = out.toByteArray();
dataOut.close();
+ bytes = out.toByteArray();
}
else if (coreType == org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE) {
int len = buffer.readInt();
bytes = new byte[len];
buffer.readBytes(bytes);
+ if (isCompressed) {
+ ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+ DeflaterOutputStream out = new DeflaterOutputStream(bytesOut);
+ out.write(bytes);
+ out.close();
+ bytes = bytesOut.toByteArray();
+ }
}
else if (coreType == org.apache.activemq.artemis.api.core.Message.STREAM_TYPE) {
- ByteArrayOutputStream out = new ByteArrayOutputStream(buffer.readableBytes());
+ org.apache.activemq.util.ByteArrayOutputStream bytesOut = new org.apache.activemq.util.ByteArrayOutputStream();
+ OutputStream out = bytesOut;
+ if (isCompressed) {
+ out = new DeflaterOutputStream(bytesOut);
+ }
DataOutputStream dataOut = new DataOutputStream(out);
boolean stop = false;
@@ -499,13 +601,52 @@ public class OpenWireMessageConverter implements MessageConverter {
break;
}
}
- bytes = out.toByteArray();
dataOut.close();
+ bytes = bytesOut.toByteArray();
+ }
+ else if (coreType == org.apache.activemq.artemis.api.core.Message.BYTES_TYPE) {
+ int n = buffer.readableBytes();
+ bytes = new byte[n];
+ buffer.readBytes(bytes);
+ if (isCompressed) {
+ int length = bytes.length;
+ org.apache.activemq.util.ByteArrayOutputStream compressed = new org.apache.activemq.util.ByteArrayOutputStream();
+ compressed.write(new byte[4]);
+ Deflater deflater = new Deflater();
+ try {
+ deflater.setInput(bytes);
+ deflater.finish();
+ byte[] bytesBuf = new byte[1024];
+ while (!deflater.finished()) {
+ int count = deflater.deflate(bytesBuf);
+ compressed.write(bytesBuf, 0, count);
+ }
+ ByteSequence byteSeq = compressed.toByteSequence();
+ ByteSequenceData.writeIntBig(byteSeq, length);
+ bytes = Arrays.copyOfRange(byteSeq.data, 0, byteSeq.length);
+ }
+ finally {
+ deflater.end();
+ compressed.close();
+ }
+ }
}
else {
int n = buffer.readableBytes();
bytes = new byte[n];
buffer.readBytes(bytes);
+ if (isCompressed) {
+ ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+ DeflaterOutputStream out = new DeflaterOutputStream(bytesOut);
+ try {
+ out.write(bytes);
+ bytes = bytesOut.toByteArray();
+ }
+ finally {
+ out.close();
+ bytesOut.close();
+ }
+ }
}
buffer.resetReaderIndex();// this is important for topics as the buffer
@@ -642,10 +783,6 @@ public class OpenWireMessageConverter implements MessageConverter {
amqMsg.setUserID(userId);
}
- Boolean isCompressed = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_COMPRESSED);
- if (isCompressed != null) {
- amqMsg.setCompressed(isCompressed);
- }
Boolean isDroppable = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_DROPPABLE);
if (isDroppable != null) {
amqMsg.setDroppable(isDroppable);
@@ -660,11 +797,12 @@ public class OpenWireMessageConverter implements MessageConverter {
throw new IOException("failure to set dlq property " + dlqCause, e);
}
}
+
Set<SimpleString> props = coreMessage.getPropertyNames();
if (props != null) {
for (SimpleString s : props) {
String keyStr = s.toString();
- if (keyStr.startsWith("__AMQ") || keyStr.startsWith("__HDR_")) {
+ if (keyStr.startsWith("_AMQ") || keyStr.startsWith("__HDR_")) {
continue;
}
Object prop = coreMessage.getObjectProperty(s);
@@ -681,6 +819,13 @@ public class OpenWireMessageConverter implements MessageConverter {
}
}
}
+ try {
+ amqMsg.onSend();
+ amqMsg.setCompressed(isCompressed);
+ }
+ catch (JMSException e) {
+ throw new IOException("Failed to covert to Openwire message", e);
+ }
return amqMsg;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0abf5246/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/command/MessageCompressionTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/command/MessageCompressionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/command/MessageCompressionTest.java
index 6b5cc45..fc4182b 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/command/MessageCompressionTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/command/MessageCompressionTest.java
@@ -20,9 +20,12 @@ import java.io.UnsupportedEncodingException;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
+import javax.jms.MapMessage;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
import javax.jms.Session;
+import javax.jms.StreamMessage;
import junit.framework.TestCase;
@@ -66,6 +69,7 @@ public class MessageCompressionTest extends TestCase {
sendTestMessage(factory, TEXT);
message = receiveTestMessage(factory);
int unCompressedSize = message.getContent().getLength();
+ assertEquals(TEXT, message.getText());
assertTrue("expected: compressed Size '" + compressedSize + "' < unCompressedSize '" + unCompressedSize + "'", compressedSize < unCompressedSize);
}
@@ -93,6 +97,209 @@ public class MessageCompressionTest extends TestCase {
assertTrue("expected: compressed Size '" + compressedSize + "' < unCompressedSize '" + unCompressedSize + "'", compressedSize < unCompressedSize);
}
+ public void testMapMessageCompression() throws Exception {
+
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
+ factory.setUseCompression(true);
+
+ sendTestMapMessage(factory, TEXT);
+ ActiveMQMapMessage mapMessage = receiveTestMapMessage(factory);
+ int compressedSize = mapMessage.getContent().getLength();
+ assertTrue(mapMessage.isCompressed());
+
+ boolean booleanVal = mapMessage.getBoolean("boolean-type");
+ assertTrue(booleanVal);
+ byte byteVal = mapMessage.getByte("byte-type");
+ assertEquals((byte)10, byteVal);
+ byte[] bytesVal = mapMessage.getBytes("bytes-type");
+ byte[] originVal = TEXT.getBytes();
+ assertEquals(originVal.length, bytesVal.length);
+ for (int i = 0; i < bytesVal.length; i++) {
+ assertTrue(bytesVal[i] == originVal[i]);
+ }
+ char charVal = mapMessage.getChar("char-type");
+ assertEquals('A', charVal);
+ double doubleVal = mapMessage.getDouble("double-type");
+ assertEquals(55.3D, doubleVal, 0.1D);
+ float floatVal = mapMessage.getFloat("float-type");
+ assertEquals(79.1F, floatVal, 0.1F);
+ int intVal = mapMessage.getInt("int-type");
+ assertEquals(37, intVal);
+ long longVal = mapMessage.getLong("long-type");
+ assertEquals(56652L, longVal);
+ Object objectVal = mapMessage.getObject("object-type");
+ Object origVal = new String("VVVV");
+ assertTrue(objectVal.equals(origVal));
+ short shortVal = mapMessage.getShort("short-type");
+ assertEquals((short) 333, shortVal);
+ String strVal = mapMessage.getString("string-type");
+ assertEquals(TEXT, strVal);
+
+ factory = new ActiveMQConnectionFactory(connectionUri);
+ factory.setUseCompression(false);
+ sendTestMapMessage(factory, TEXT);
+ mapMessage = receiveTestMapMessage(factory);
+ int unCompressedSize = mapMessage.getContent().getLength();
+
+ assertTrue("expected: compressed Size '" + compressedSize + "' < unCompressedSize '" + unCompressedSize + "'", compressedSize < unCompressedSize);
+ }
+
+ public void testStreamMessageCompression() throws Exception {
+
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
+ factory.setUseCompression(true);
+
+ sendTestStreamMessage(factory, TEXT);
+ ActiveMQStreamMessage streamMessage = receiveTestStreamMessage(factory);
+ int compressedSize = streamMessage.getContent().getLength();
+ assertTrue(streamMessage.isCompressed());
+
+ boolean booleanVal = streamMessage.readBoolean();
+ assertTrue(booleanVal);
+ byte byteVal = streamMessage.readByte();
+ assertEquals((byte)10, byteVal);
+ byte[] originVal = TEXT.getBytes();
+ byte[] bytesVal = new byte[originVal.length];
+ streamMessage.readBytes(bytesVal);
+ for (int i = 0; i < bytesVal.length; i++) {
+ assertTrue(bytesVal[i] == originVal[i]);
+ }
+ char charVal = streamMessage.readChar();
+ assertEquals('A', charVal);
+ double doubleVal = streamMessage.readDouble();
+ assertEquals(55.3D, doubleVal, 0.1D);
+ float floatVal = streamMessage.readFloat();
+ assertEquals(79.1F, floatVal, 0.1F);
+ int intVal = streamMessage.readInt();
+ assertEquals(37, intVal);
+ long longVal = streamMessage.readLong();
+ assertEquals(56652L, longVal);
+ Object objectVal = streamMessage.readObject();
+ Object origVal = new String("VVVV");
+ assertTrue(objectVal.equals(origVal));
+ short shortVal = streamMessage.readShort();
+ assertEquals((short) 333, shortVal);
+ String strVal = streamMessage.readString();
+ assertEquals(TEXT, strVal);
+
+ factory = new ActiveMQConnectionFactory(connectionUri);
+ factory.setUseCompression(false);
+ sendTestStreamMessage(factory, TEXT);
+ streamMessage = receiveTestStreamMessage(factory);
+ int unCompressedSize = streamMessage.getContent().getLength();
+
+ System.out.println("compressedSize: " + compressedSize + " un: " + unCompressedSize);
+ assertTrue("expected: compressed Size '" + compressedSize + "' < unCompressedSize '" + unCompressedSize + "'", compressedSize < unCompressedSize);
+ }
+
+ public void testObjectMessageCompression() throws Exception {
+
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
+ factory.setUseCompression(true);
+
+ sendTestObjectMessage(factory, TEXT);
+ ActiveMQObjectMessage objectMessage = receiveTestObjectMessage(factory);
+ int compressedSize = objectMessage.getContent().getLength();
+ assertTrue(objectMessage.isCompressed());
+
+ Object objectVal = objectMessage.getObject();
+ assertEquals(TEXT, objectVal);
+
+ factory = new ActiveMQConnectionFactory(connectionUri);
+ factory.setUseCompression(false);
+ sendTestObjectMessage(factory, TEXT);
+ objectMessage = receiveTestObjectMessage(factory);
+ int unCompressedSize = objectMessage.getContent().getLength();
+
+ assertTrue("expected: compressed Size '" + compressedSize + "' < unCompressedSize '" + unCompressedSize + "'", compressedSize < unCompressedSize);
+ }
+
+ private void sendTestObjectMessage(ActiveMQConnectionFactory factory, String message) throws JMSException {
+ ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(queue);
+ ObjectMessage objectMessage = session.createObjectMessage();
+
+ objectMessage.setObject(TEXT);
+
+ producer.send(objectMessage);
+ connection.close();
+ }
+
+ private ActiveMQObjectMessage receiveTestObjectMessage(ActiveMQConnectionFactory factory) throws JMSException {
+ ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(queue);
+ ActiveMQObjectMessage rc = (ActiveMQObjectMessage) consumer.receive();
+ connection.close();
+ return rc;
+ }
+
+ private void sendTestStreamMessage(ActiveMQConnectionFactory factory, String message) throws JMSException {
+ ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(queue);
+ StreamMessage streamMessage = session.createStreamMessage();
+
+ streamMessage.writeBoolean(true);
+ streamMessage.writeByte((byte) 10);
+ streamMessage.writeBytes(TEXT.getBytes());
+ streamMessage.writeChar('A');
+ streamMessage.writeDouble(55.3D);
+ streamMessage.writeFloat(79.1F);
+ streamMessage.writeInt(37);
+ streamMessage.writeLong(56652L);
+ streamMessage.writeObject(new String("VVVV"));
+ streamMessage.writeShort((short) 333);
+ streamMessage.writeString(TEXT);
+
+ producer.send(streamMessage);
+ connection.close();
+ }
+
+ private ActiveMQStreamMessage receiveTestStreamMessage(ActiveMQConnectionFactory factory) throws JMSException {
+ ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(queue);
+ ActiveMQStreamMessage rc = (ActiveMQStreamMessage) consumer.receive();
+ connection.close();
+ return rc;
+ }
+
+ private void sendTestMapMessage(ActiveMQConnectionFactory factory, String message) throws JMSException {
+ ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(queue);
+ MapMessage mapMessage = session.createMapMessage();
+
+ mapMessage.setBoolean("boolean-type", true);
+ mapMessage.setByte("byte-type", (byte) 10);
+ mapMessage.setBytes("bytes-type", TEXT.getBytes());
+ mapMessage.setChar("char-type", 'A');
+ mapMessage.setDouble("double-type", 55.3D);
+ mapMessage.setFloat("float-type", 79.1F);
+ mapMessage.setInt("int-type", 37);
+ mapMessage.setLong("long-type", 56652L);
+ mapMessage.setObject("object-type", new String("VVVV"));
+ mapMessage.setShort("short-type", (short) 333);
+ mapMessage.setString("string-type", TEXT);
+
+ producer.send(mapMessage);
+ connection.close();
+ }
+
+ private ActiveMQMapMessage receiveTestMapMessage(ActiveMQConnectionFactory factory) throws JMSException {
+ ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(queue);
+ ActiveMQMapMessage rc = (ActiveMQMapMessage) consumer.receive();
+ connection.close();
+ return rc;
+ }
+
private void sendTestMessage(ActiveMQConnectionFactory factory, String message) throws JMSException {
ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0abf5246/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/CompressedInteropTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/CompressedInteropTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/CompressedInteropTest.java
new file mode 100644
index 0000000..2123f53
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/CompressedInteropTest.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.openwire.interop;
+
+import org.apache.activemq.ActiveMQMessageProducer;
+import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+
+public class CompressedInteropTest extends BasicOpenWireTest {
+
+ private static final String TEXT;
+ static {
+ StringBuilder builder = new StringBuilder();
+
+ for (int i = 0; i < 20; i++) {
+ builder.append("The quick red fox jumped over the lazy brown dog. ");
+ }
+ TEXT = builder.toString();
+ }
+
+ @Before
+ @Override
+ public void setUp() throws Exception {
+ factory.setUseCompression(true);
+ super.setUp();
+ connection.start();
+ assertTrue(connection.isUseCompression());
+ }
+
+ @Test
+ public void testCoreReceiveOpenWireCompressedMessages() throws Exception {
+ //TextMessage
+ sendCompressedTextMessageUsingOpenWire();
+ receiveTextMessageUsingCore();
+ //BytesMessage
+ sendCompressedBytesMessageUsingOpenWire();
+ receiveBytesMessageUsingCore();
+ //MapMessage
+ sendCompressedMapMessageUsingOpenWire();
+ receiveMapMessageUsingCore();
+ //StreamMessage
+ sendCompressedStreamMessageUsingOpenWire();
+ receiveStreamMessageUsingCore();
+ //ObjectMessage
+ sendCompressedObjectMessageUsingOpenWire();
+ receiveObjectMessageUsingCore();
+ }
+
+ private void sendCompressedStreamMessageUsingOpenWire() throws Exception {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ ActiveMQDestination destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
+
+ final ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination);
+
+ StreamMessage streamMessage = session.createStreamMessage();
+
+ streamMessage.writeBoolean(true);
+ streamMessage.writeByte((byte) 10);
+ streamMessage.writeBytes(TEXT.getBytes());
+ streamMessage.writeChar('A');
+ streamMessage.writeDouble(55.3D);
+ streamMessage.writeFloat(79.1F);
+ streamMessage.writeInt(37);
+ streamMessage.writeLong(56652L);
+ streamMessage.writeObject(new String("VVVV"));
+ streamMessage.writeShort((short) 333);
+ streamMessage.writeString(TEXT);
+
+ producer.send(streamMessage);
+ }
+
+ private void receiveStreamMessageUsingCore() throws Exception {
+ StreamMessage streamMessage = (StreamMessage) receiveMessageUsingCore();
+ boolean booleanVal = streamMessage.readBoolean();
+ assertTrue(booleanVal);
+ byte byteVal = streamMessage.readByte();
+ assertEquals((byte)10, byteVal);
+ byte[] originVal = TEXT.getBytes();
+ byte[] bytesVal = new byte[originVal.length];
+ streamMessage.readBytes(bytesVal);
+ for (int i = 0; i < bytesVal.length; i++) {
+ assertTrue(bytesVal[i] == originVal[i]);
+ }
+ char charVal = streamMessage.readChar();
+ assertEquals('A', charVal);
+ double doubleVal = streamMessage.readDouble();
+ assertEquals(55.3D, doubleVal, 0.1D);
+ float floatVal = streamMessage.readFloat();
+ assertEquals(79.1F, floatVal, 0.1F);
+ int intVal = streamMessage.readInt();
+ assertEquals(37, intVal);
+ long longVal = streamMessage.readLong();
+ assertEquals(56652L, longVal);
+ Object objectVal = streamMessage.readObject();
+ Object origVal = new String("VVVV");
+ assertTrue(objectVal.equals(origVal));
+ short shortVal = streamMessage.readShort();
+ assertEquals((short) 333, shortVal);
+ String strVal = streamMessage.readString();
+ assertEquals(TEXT, strVal);
+ }
+
+ private void sendCompressedObjectMessageUsingOpenWire() throws Exception {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ ActiveMQDestination destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
+
+ final ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination);
+
+ ObjectMessage objectMessage = session.createObjectMessage();
+ objectMessage.setObject(TEXT);
+
+ producer.send(objectMessage);
+ }
+
+ private void receiveObjectMessageUsingCore() throws Exception {
+ ObjectMessage objectMessage = (ObjectMessage) receiveMessageUsingCore();
+ Object objectVal = objectMessage.getObject();
+ assertEquals(TEXT, objectVal);
+ }
+
+ private void sendCompressedMapMessageUsingOpenWire() throws Exception {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ ActiveMQDestination destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
+
+ final ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination);
+
+ MapMessage mapMessage = session.createMapMessage();
+
+ mapMessage.setBoolean("boolean-type", true);
+ mapMessage.setByte("byte-type", (byte) 10);
+ mapMessage.setBytes("bytes-type", TEXT.getBytes());
+ mapMessage.setChar("char-type", 'A');
+ mapMessage.setDouble("double-type", 55.3D);
+ mapMessage.setFloat("float-type", 79.1F);
+ mapMessage.setInt("int-type", 37);
+ mapMessage.setLong("long-type", 56652L);
+ mapMessage.setObject("object-type", new String("VVVV"));
+ mapMessage.setShort("short-type", (short) 333);
+ mapMessage.setString("string-type", TEXT);
+
+ producer.send(mapMessage);
+ }
+
+ private void receiveMapMessageUsingCore() throws Exception {
+ MapMessage mapMessage = (MapMessage) receiveMessageUsingCore();
+
+ boolean booleanVal = mapMessage.getBoolean("boolean-type");
+ assertTrue(booleanVal);
+ byte byteVal = mapMessage.getByte("byte-type");
+ assertEquals((byte)10, byteVal);
+ byte[] bytesVal = mapMessage.getBytes("bytes-type");
+ byte[] originVal = TEXT.getBytes();
+ assertEquals(originVal.length, bytesVal.length);
+ for (int i = 0; i < bytesVal.length; i++) {
+ assertTrue(bytesVal[i] == originVal[i]);
+ }
+ char charVal = mapMessage.getChar("char-type");
+ assertEquals('A', charVal);
+ double doubleVal = mapMessage.getDouble("double-type");
+ assertEquals(55.3D, doubleVal, 0.1D);
+ float floatVal = mapMessage.getFloat("float-type");
+ assertEquals(79.1F, floatVal, 0.1F);
+ int intVal = mapMessage.getInt("int-type");
+ assertEquals(37, intVal);
+ long longVal = mapMessage.getLong("long-type");
+ assertEquals(56652L, longVal);
+ Object objectVal = mapMessage.getObject("object-type");
+ Object origVal = new String("VVVV");
+ assertTrue(objectVal.equals(origVal));
+ short shortVal = mapMessage.getShort("short-type");
+ assertEquals((short) 333, shortVal);
+ String strVal = mapMessage.getString("string-type");
+ assertEquals(TEXT, strVal);
+ }
+
+ private void sendCompressedBytesMessageUsingOpenWire() throws Exception {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ ActiveMQDestination destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
+
+ final ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination);
+
+ BytesMessage bytesMessage = session.createBytesMessage();
+ bytesMessage.writeBytes(TEXT.getBytes());
+
+ producer.send(bytesMessage);
+ }
+
+ private void receiveBytesMessageUsingCore() throws Exception {
+ BytesMessage bytesMessage = (BytesMessage) receiveMessageUsingCore();
+
+ byte[] bytes = new byte[TEXT.getBytes("UTF8").length];
+ bytesMessage.readBytes(bytes);
+ assertTrue(bytesMessage.readBytes(new byte[255]) == -1);
+
+ String rcvString = new String(bytes, "UTF8");
+ assertEquals(TEXT, rcvString);
+ }
+
+ private void receiveTextMessageUsingCore() throws Exception {
+ TextMessage txtMessage = (TextMessage) receiveMessageUsingCore();
+ assertEquals(TEXT, txtMessage.getText());
+ }
+
+ private Message receiveMessageUsingCore() throws Exception {
+ Connection jmsConn = null;
+ Message message = null;
+ try {
+ jmsConn = coreCf.createConnection();
+ jmsConn.start();
+
+ Session session = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(this.queueName);
+ MessageConsumer coreConsumer = session.createConsumer(queue);
+
+ message = coreConsumer.receive(5000);
+ }
+ finally {
+ if (jmsConn != null) {
+ jmsConn.close();
+ }
+ }
+ return message;
+ }
+
+ private void sendCompressedTextMessageUsingOpenWire() throws Exception {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ ActiveMQDestination destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
+
+ final ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination);
+
+ TextMessage textMessage = session.createTextMessage(TEXT);
+
+ producer.send(textMessage);
+ }
+
+}
[2/2] activemq-artemis git commit: This closes #163
Posted by cl...@apache.org.
This closes #163
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/cbb6c63d
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/cbb6c63d
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/cbb6c63d
Branch: refs/heads/master
Commit: cbb6c63d0063efb2afb6fbf2ddbc533a02425091
Parents: 6408fd0 0abf524
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon Sep 14 08:54:01 2015 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Sep 14 08:54:01 2015 -0400
----------------------------------------------------------------------
.../openwire/OpenWireMessageConverter.java | 177 +++++++++++--
.../command/MessageCompressionTest.java | 207 +++++++++++++++
.../openwire/interop/CompressedInteropTest.java | 263 +++++++++++++++++++
3 files changed, 631 insertions(+), 16 deletions(-)
----------------------------------------------------------------------