You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2015/09/14 14:54:17 UTC

[1/2] activemq-artemis git commit: ARTEMIS-200 Message Compression Support

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 6408fd035 -> cbb6c63d0


ARTEMIS-200 Message Compression Support


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/0abf5246
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/0abf5246
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/0abf5246

Branch: refs/heads/master
Commit: 0abf52468ba22004fc51e4314275259f97caa4dd
Parents: 6408fd0
Author: Howard Gao <ho...@gmail.com>
Authored: Fri Sep 11 15:29:49 2015 +0800
Committer: Howard Gao <ho...@gmail.com>
Committed: Mon Sep 14 09:56:19 2015 +0800

----------------------------------------------------------------------
 .../openwire/OpenWireMessageConverter.java      | 177 +++++++++++--
 .../command/MessageCompressionTest.java         | 207 +++++++++++++++
 .../openwire/interop/CompressedInteropTest.java | 263 +++++++++++++++++++
 3 files changed, 631 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0abf5246/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 717ca8e..bda4c22 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -22,10 +22,17 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.Inflater;
+import java.util.zip.InflaterInputStream;
+import java.util.zip.InflaterOutputStream;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
@@ -55,6 +62,7 @@ import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.util.ByteArrayInputStream;
 import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.ByteSequenceData;
 import org.apache.activemq.util.MarshallingSupport;
 import org.apache.activemq.wireformat.WireFormat;
 import org.fusesource.hawtbuf.UTF8Buffer;
@@ -85,8 +93,8 @@ public class OpenWireMessageConverter implements MessageConverter {
    private static final String AMQ_MSG_TX_ID = AMQ_PREFIX + "TX_ID";
    private static final String AMQ_MSG_USER_ID = AMQ_PREFIX + "USER_ID";
 
-   private static final String AMQ_MSG_COMPRESSED = AMQ_PREFIX + "COMPRESSED";
    private static final String AMQ_MSG_DROPPABLE = AMQ_PREFIX + "DROPPABLE";
+   private static final String AMQ_MSG_COMPRESSED = AMQ_PREFIX + "COMPRESSED";
 
    @Override
    public ServerMessage inbound(Object message) {
@@ -118,9 +126,17 @@ public class OpenWireMessageConverter implements MessageConverter {
       ByteSequence contents = messageSend.getContent();
       if (contents != null) {
          ActiveMQBuffer body = coreMessage.getBodyBuffer();
+         boolean messageCompressed = messageSend.isCompressed();
+         if (messageCompressed) {
+            coreMessage.putBooleanProperty(AMQ_MSG_COMPRESSED, messageCompressed);
+         }
+
          switch (coreType) {
             case org.apache.activemq.artemis.api.core.Message.TEXT_TYPE:
-               ByteArrayInputStream tis = new ByteArrayInputStream(contents);
+               InputStream tis = new ByteArrayInputStream(contents);
+               if (messageCompressed) {
+                  tis = new InflaterInputStream(tis);
+               }
                DataInputStream tdataIn = new DataInputStream(tis);
                String text = MarshallingSupport.readUTF8(tdataIn);
                tdataIn.close();
@@ -128,6 +144,9 @@ public class OpenWireMessageConverter implements MessageConverter {
                break;
             case org.apache.activemq.artemis.api.core.Message.MAP_TYPE:
                InputStream mis = new ByteArrayInputStream(contents);
+               if (messageCompressed) {
+                  mis = new InflaterInputStream(mis);
+               }
                DataInputStream mdataIn = new DataInputStream(mis);
                Map<String, Object> map = MarshallingSupport.unmarshalPrimitiveMap(mdataIn);
                mdataIn.close();
@@ -136,11 +155,33 @@ public class OpenWireMessageConverter implements MessageConverter {
                props.encode(body);
                break;
             case org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE:
+               if (messageCompressed) {
+                  InputStream ois = new ByteArrayInputStream(contents);
+                  ois = new InflaterInputStream(ois);
+                  org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream();
+
+                  try {
+                     byte[] buf = new byte[1024];
+                     int n = ois.read(buf);
+                     while (n != -1) {
+                        decompressed.write(buf, 0, n);
+                        n = ois.read();
+                     }
+                     //read done
+                     contents = decompressed.toByteSequence();
+                  }
+                  finally {
+                     decompressed.close();
+                  }
+               }
                body.writeInt(contents.length);
                body.writeBytes(contents.data, contents.offset, contents.length);
                break;
             case org.apache.activemq.artemis.api.core.Message.STREAM_TYPE:
                InputStream sis = new ByteArrayInputStream(contents);
+               if (messageCompressed) {
+                  sis = new InflaterInputStream(sis);
+               }
                DataInputStream sdis = new DataInputStream(sis);
                int stype = sdis.read();
                while (stype != -1) {
@@ -210,7 +251,47 @@ public class OpenWireMessageConverter implements MessageConverter {
                }
                sdis.close();
                break;
+            case org.apache.activemq.artemis.api.core.Message.BYTES_TYPE:
+               if (messageCompressed) {
+                  Inflater inflater = new Inflater();
+                  org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream();
+                  try {
+                     int length = ByteSequenceData.readIntBig(contents);
+                     contents.offset = 0;
+                     byte[] data = Arrays.copyOfRange(contents.getData(), 4, contents.getLength());
+
+                     inflater.setInput(data);
+                     byte[] buffer = new byte[length];
+                     int count = inflater.inflate(buffer);
+                     decompressed.write(buffer, 0, count);
+                     contents = decompressed.toByteSequence();
+                  }
+                  catch (Exception e) {
+                     throw new IOException(e);
+                  }
+                  finally {
+                     inflater.end();
+                     decompressed.close();
+                  }
+               }
+               body.writeBytes(contents.data, contents.offset, contents.length);
+               break;
             default:
+               if (messageCompressed) {
+                  org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream();
+                  OutputStream os = new InflaterOutputStream(decompressed);
+                  try {
+                     os.write(contents.data, contents.offset, contents.getLength());
+                     contents = decompressed.toByteSequence();
+                  }
+                  catch (Exception e) {
+                     throw new IOException(e);
+                  }
+                  finally {
+                     os.close();
+                     decompressed.close();
+                  }
+               }
                body.writeBytes(contents.data, contents.offset, contents.length);
                break;
          }
@@ -317,7 +398,6 @@ public class OpenWireMessageConverter implements MessageConverter {
       if (userId != null) {
          coreMessage.putStringProperty(AMQ_MSG_USER_ID, userId);
       }
-      coreMessage.putBooleanProperty(AMQ_MSG_COMPRESSED, messageSend.isCompressed());
       coreMessage.putBooleanProperty(AMQ_MSG_DROPPABLE, messageSend.isDroppable());
    }
 
@@ -357,7 +437,7 @@ public class OpenWireMessageConverter implements MessageConverter {
 
    public static MessageDispatch createMessageDispatch(ServerMessage message,
                                                        int deliveryCount,
-                                                       AMQConsumer consumer) throws IOException {
+                                                       AMQConsumer consumer) throws IOException, JMSException {
       ActiveMQMessage amqMessage = toAMQMessage(message, consumer.getMarshaller(), consumer.getActualDestination());
 
       MessageDispatch md = new MessageDispatch();
@@ -412,18 +492,25 @@ public class OpenWireMessageConverter implements MessageConverter {
       amqMsg.setBrokerInTime(brokerInTime);
 
       ActiveMQBuffer buffer = coreMessage.getBodyBufferCopy();
+      Boolean compressProp = (Boolean)coreMessage.getObjectProperty(AMQ_MSG_COMPRESSED);
+      boolean isCompressed = compressProp == null ? false : compressProp.booleanValue();
+      amqMsg.setCompressed(isCompressed);
+
       if (buffer != null) {
          buffer.resetReaderIndex();
          byte[] bytes = null;
          synchronized (buffer) {
             if (coreType == org.apache.activemq.artemis.api.core.Message.TEXT_TYPE) {
                SimpleString text = buffer.readNullableSimpleString();
-
                if (text != null) {
-                  ByteArrayOutputStream out = new ByteArrayOutputStream(text.length() + 4);
+                  ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(text.length() + 4);
+                  OutputStream out = bytesOut;
+                  if (isCompressed) {
+                     out = new DeflaterOutputStream(out);
+                  }
                   DataOutputStream dataOut = new DataOutputStream(out);
                   MarshallingSupport.writeUTF8(dataOut, text.toString());
-                  bytes = out.toByteArray();
+                  bytes = bytesOut.toByteArray();
                   out.close();
                }
             }
@@ -433,18 +520,33 @@ public class OpenWireMessageConverter implements MessageConverter {
 
                Map<String, Object> map = mapData.getMap();
                ByteArrayOutputStream out = new ByteArrayOutputStream(mapData.getEncodeSize());
-               DataOutputStream dataOut = new DataOutputStream(out);
+               OutputStream os = out;
+               if (isCompressed) {
+                  os = new DeflaterOutputStream(os);
+               }
+               DataOutputStream dataOut = new DataOutputStream(os);
                MarshallingSupport.marshalPrimitiveMap(map, dataOut);
-               bytes = out.toByteArray();
                dataOut.close();
+               bytes = out.toByteArray();
             }
             else if (coreType == org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE) {
                int len = buffer.readInt();
                bytes = new byte[len];
                buffer.readBytes(bytes);
+               if (isCompressed) {
+                  ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+                  DeflaterOutputStream out = new DeflaterOutputStream(bytesOut);
+                  out.write(bytes);
+                  out.close();
+                  bytes = bytesOut.toByteArray();
+               }
             }
             else if (coreType == org.apache.activemq.artemis.api.core.Message.STREAM_TYPE) {
-               ByteArrayOutputStream out = new ByteArrayOutputStream(buffer.readableBytes());
+               org.apache.activemq.util.ByteArrayOutputStream bytesOut = new org.apache.activemq.util.ByteArrayOutputStream();
+               OutputStream out = bytesOut;
+               if (isCompressed) {
+                  out = new DeflaterOutputStream(bytesOut);
+               }
                DataOutputStream dataOut = new DataOutputStream(out);
 
                boolean stop = false;
@@ -499,13 +601,52 @@ public class OpenWireMessageConverter implements MessageConverter {
                         break;
                   }
                }
-               bytes = out.toByteArray();
                dataOut.close();
+               bytes = bytesOut.toByteArray();
+            }
+            else if (coreType == org.apache.activemq.artemis.api.core.Message.BYTES_TYPE) {
+               int n = buffer.readableBytes();
+               bytes = new byte[n];
+               buffer.readBytes(bytes);
+               if (isCompressed) {
+                  int length = bytes.length;
+                  org.apache.activemq.util.ByteArrayOutputStream compressed = new org.apache.activemq.util.ByteArrayOutputStream();
+                  compressed.write(new byte[4]);
+                  Deflater deflater = new Deflater();
+                  try {
+                     deflater.setInput(bytes);
+                     deflater.finish();
+                     byte[] bytesBuf = new byte[1024];
+                     while (!deflater.finished()) {
+                        int count = deflater.deflate(bytesBuf);
+                        compressed.write(bytesBuf, 0, count);
+                     }
+                     ByteSequence byteSeq = compressed.toByteSequence();
+                     ByteSequenceData.writeIntBig(byteSeq, length);
+                     bytes = Arrays.copyOfRange(byteSeq.data, 0, byteSeq.length);
+                  }
+                  finally {
+                     deflater.end();
+                     compressed.close();
+                  }
+               }
             }
             else {
                int n = buffer.readableBytes();
                bytes = new byte[n];
                buffer.readBytes(bytes);
+               if (isCompressed) {
+                  ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+                  DeflaterOutputStream out = new DeflaterOutputStream(bytesOut);
+                  try {
+                     out.write(bytes);
+                     bytes = bytesOut.toByteArray();
+                  }
+                  finally {
+                     out.close();
+                     bytesOut.close();
+                  }
+               }
             }
 
             buffer.resetReaderIndex();// this is important for topics as the buffer
@@ -642,10 +783,6 @@ public class OpenWireMessageConverter implements MessageConverter {
          amqMsg.setUserID(userId);
       }
 
-      Boolean isCompressed = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_COMPRESSED);
-      if (isCompressed != null) {
-         amqMsg.setCompressed(isCompressed);
-      }
       Boolean isDroppable = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_DROPPABLE);
       if (isDroppable != null) {
          amqMsg.setDroppable(isDroppable);
@@ -660,11 +797,12 @@ public class OpenWireMessageConverter implements MessageConverter {
             throw new IOException("failure to set dlq property " + dlqCause, e);
          }
       }
+
       Set<SimpleString> props = coreMessage.getPropertyNames();
       if (props != null) {
          for (SimpleString s : props) {
             String keyStr = s.toString();
-            if (keyStr.startsWith("__AMQ") || keyStr.startsWith("__HDR_")) {
+            if (keyStr.startsWith("_AMQ") || keyStr.startsWith("__HDR_")) {
                continue;
             }
             Object prop = coreMessage.getObjectProperty(s);
@@ -681,6 +819,13 @@ public class OpenWireMessageConverter implements MessageConverter {
             }
          }
       }
+      try {
+         amqMsg.onSend();
+         amqMsg.setCompressed(isCompressed);
+      }
+      catch (JMSException e) {
+         throw new IOException("Failed to covert to Openwire message", e);
+      }
       return amqMsg;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0abf5246/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/command/MessageCompressionTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/command/MessageCompressionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/command/MessageCompressionTest.java
index 6b5cc45..fc4182b 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/command/MessageCompressionTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/command/MessageCompressionTest.java
@@ -20,9 +20,12 @@ import java.io.UnsupportedEncodingException;
 
 import javax.jms.BytesMessage;
 import javax.jms.JMSException;
+import javax.jms.MapMessage;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
 import javax.jms.Session;
+import javax.jms.StreamMessage;
 
 import junit.framework.TestCase;
 
@@ -66,6 +69,7 @@ public class MessageCompressionTest extends TestCase {
       sendTestMessage(factory, TEXT);
       message = receiveTestMessage(factory);
       int unCompressedSize = message.getContent().getLength();
+      assertEquals(TEXT, message.getText());
 
       assertTrue("expected: compressed Size '" + compressedSize + "' < unCompressedSize '" + unCompressedSize + "'", compressedSize < unCompressedSize);
    }
@@ -93,6 +97,209 @@ public class MessageCompressionTest extends TestCase {
       assertTrue("expected: compressed Size '" + compressedSize + "' < unCompressedSize '" + unCompressedSize + "'", compressedSize < unCompressedSize);
    }
 
+   public void testMapMessageCompression() throws Exception {
+
+      ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
+      factory.setUseCompression(true);
+
+      sendTestMapMessage(factory, TEXT);
+      ActiveMQMapMessage mapMessage = receiveTestMapMessage(factory);
+      int compressedSize = mapMessage.getContent().getLength();
+      assertTrue(mapMessage.isCompressed());
+
+      boolean booleanVal = mapMessage.getBoolean("boolean-type");
+      assertTrue(booleanVal);
+      byte byteVal = mapMessage.getByte("byte-type");
+      assertEquals((byte)10, byteVal);
+      byte[] bytesVal = mapMessage.getBytes("bytes-type");
+      byte[] originVal = TEXT.getBytes();
+      assertEquals(originVal.length, bytesVal.length);
+      for (int i = 0; i < bytesVal.length; i++) {
+         assertTrue(bytesVal[i] == originVal[i]);
+      }
+      char charVal = mapMessage.getChar("char-type");
+      assertEquals('A', charVal);
+      double doubleVal = mapMessage.getDouble("double-type");
+      assertEquals(55.3D, doubleVal, 0.1D);
+      float floatVal = mapMessage.getFloat("float-type");
+      assertEquals(79.1F, floatVal, 0.1F);
+      int intVal = mapMessage.getInt("int-type");
+      assertEquals(37, intVal);
+      long longVal = mapMessage.getLong("long-type");
+      assertEquals(56652L, longVal);
+      Object objectVal = mapMessage.getObject("object-type");
+      Object origVal = new String("VVVV");
+      assertTrue(objectVal.equals(origVal));
+      short shortVal = mapMessage.getShort("short-type");
+      assertEquals((short) 333, shortVal);
+      String strVal = mapMessage.getString("string-type");
+      assertEquals(TEXT, strVal);
+
+      factory = new ActiveMQConnectionFactory(connectionUri);
+      factory.setUseCompression(false);
+      sendTestMapMessage(factory, TEXT);
+      mapMessage = receiveTestMapMessage(factory);
+      int unCompressedSize = mapMessage.getContent().getLength();
+
+      assertTrue("expected: compressed Size '" + compressedSize + "' < unCompressedSize '" + unCompressedSize + "'", compressedSize < unCompressedSize);
+   }
+
+   public void testStreamMessageCompression() throws Exception {
+
+      ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
+      factory.setUseCompression(true);
+
+      sendTestStreamMessage(factory, TEXT);
+      ActiveMQStreamMessage streamMessage = receiveTestStreamMessage(factory);
+      int compressedSize = streamMessage.getContent().getLength();
+      assertTrue(streamMessage.isCompressed());
+
+      boolean booleanVal = streamMessage.readBoolean();
+      assertTrue(booleanVal);
+      byte byteVal = streamMessage.readByte();
+      assertEquals((byte)10, byteVal);
+      byte[] originVal = TEXT.getBytes();
+      byte[] bytesVal = new byte[originVal.length];
+      streamMessage.readBytes(bytesVal);
+      for (int i = 0; i < bytesVal.length; i++) {
+         assertTrue(bytesVal[i] == originVal[i]);
+      }
+      char charVal = streamMessage.readChar();
+      assertEquals('A', charVal);
+      double doubleVal = streamMessage.readDouble();
+      assertEquals(55.3D, doubleVal, 0.1D);
+      float floatVal = streamMessage.readFloat();
+      assertEquals(79.1F, floatVal, 0.1F);
+      int intVal = streamMessage.readInt();
+      assertEquals(37, intVal);
+      long longVal = streamMessage.readLong();
+      assertEquals(56652L, longVal);
+      Object objectVal = streamMessage.readObject();
+      Object origVal = new String("VVVV");
+      assertTrue(objectVal.equals(origVal));
+      short shortVal = streamMessage.readShort();
+      assertEquals((short) 333, shortVal);
+      String strVal = streamMessage.readString();
+      assertEquals(TEXT, strVal);
+
+      factory = new ActiveMQConnectionFactory(connectionUri);
+      factory.setUseCompression(false);
+      sendTestStreamMessage(factory, TEXT);
+      streamMessage = receiveTestStreamMessage(factory);
+      int unCompressedSize = streamMessage.getContent().getLength();
+
+      System.out.println("compressedSize: " + compressedSize + " un: " + unCompressedSize);
+      assertTrue("expected: compressed Size '" + compressedSize + "' < unCompressedSize '" + unCompressedSize + "'", compressedSize < unCompressedSize);
+   }
+
+   public void testObjectMessageCompression() throws Exception {
+
+      ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
+      factory.setUseCompression(true);
+
+      sendTestObjectMessage(factory, TEXT);
+      ActiveMQObjectMessage objectMessage = receiveTestObjectMessage(factory);
+      int compressedSize = objectMessage.getContent().getLength();
+      assertTrue(objectMessage.isCompressed());
+
+      Object objectVal = objectMessage.getObject();
+      assertEquals(TEXT, objectVal);
+
+      factory = new ActiveMQConnectionFactory(connectionUri);
+      factory.setUseCompression(false);
+      sendTestObjectMessage(factory, TEXT);
+      objectMessage = receiveTestObjectMessage(factory);
+      int unCompressedSize = objectMessage.getContent().getLength();
+
+      assertTrue("expected: compressed Size '" + compressedSize + "' < unCompressedSize '" + unCompressedSize + "'", compressedSize < unCompressedSize);
+   }
+
+   private void sendTestObjectMessage(ActiveMQConnectionFactory factory, String message) throws JMSException {
+      ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageProducer producer = session.createProducer(queue);
+      ObjectMessage objectMessage = session.createObjectMessage();
+
+      objectMessage.setObject(TEXT);
+
+      producer.send(objectMessage);
+      connection.close();
+   }
+
+   private ActiveMQObjectMessage receiveTestObjectMessage(ActiveMQConnectionFactory factory) throws JMSException {
+      ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
+      connection.start();
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageConsumer consumer = session.createConsumer(queue);
+      ActiveMQObjectMessage rc = (ActiveMQObjectMessage) consumer.receive();
+      connection.close();
+      return rc;
+   }
+
+   private void sendTestStreamMessage(ActiveMQConnectionFactory factory, String message) throws JMSException {
+      ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageProducer producer = session.createProducer(queue);
+      StreamMessage streamMessage = session.createStreamMessage();
+
+      streamMessage.writeBoolean(true);
+      streamMessage.writeByte((byte) 10);
+      streamMessage.writeBytes(TEXT.getBytes());
+      streamMessage.writeChar('A');
+      streamMessage.writeDouble(55.3D);
+      streamMessage.writeFloat(79.1F);
+      streamMessage.writeInt(37);
+      streamMessage.writeLong(56652L);
+      streamMessage.writeObject(new String("VVVV"));
+      streamMessage.writeShort((short) 333);
+      streamMessage.writeString(TEXT);
+
+      producer.send(streamMessage);
+      connection.close();
+   }
+
+   private ActiveMQStreamMessage receiveTestStreamMessage(ActiveMQConnectionFactory factory) throws JMSException {
+      ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
+      connection.start();
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageConsumer consumer = session.createConsumer(queue);
+      ActiveMQStreamMessage rc = (ActiveMQStreamMessage) consumer.receive();
+      connection.close();
+      return rc;
+   }
+
+   private void sendTestMapMessage(ActiveMQConnectionFactory factory, String message) throws JMSException {
+      ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageProducer producer = session.createProducer(queue);
+      MapMessage mapMessage = session.createMapMessage();
+
+      mapMessage.setBoolean("boolean-type", true);
+      mapMessage.setByte("byte-type", (byte) 10);
+      mapMessage.setBytes("bytes-type", TEXT.getBytes());
+      mapMessage.setChar("char-type", 'A');
+      mapMessage.setDouble("double-type", 55.3D);
+      mapMessage.setFloat("float-type", 79.1F);
+      mapMessage.setInt("int-type", 37);
+      mapMessage.setLong("long-type", 56652L);
+      mapMessage.setObject("object-type", new String("VVVV"));
+      mapMessage.setShort("short-type", (short) 333);
+      mapMessage.setString("string-type", TEXT);
+
+      producer.send(mapMessage);
+      connection.close();
+   }
+
+   private ActiveMQMapMessage receiveTestMapMessage(ActiveMQConnectionFactory factory) throws JMSException {
+      ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
+      connection.start();
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageConsumer consumer = session.createConsumer(queue);
+      ActiveMQMapMessage rc = (ActiveMQMapMessage) consumer.receive();
+      connection.close();
+      return rc;
+   }
+
    private void sendTestMessage(ActiveMQConnectionFactory factory, String message) throws JMSException {
       ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
       Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0abf5246/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/CompressedInteropTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/CompressedInteropTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/CompressedInteropTest.java
new file mode 100644
index 0000000..2123f53
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/CompressedInteropTest.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.openwire.interop;
+
+import org.apache.activemq.ActiveMQMessageProducer;
+import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+
+public class CompressedInteropTest extends BasicOpenWireTest {
+
+   private static final String TEXT;
+   static {
+      StringBuilder builder = new StringBuilder();
+
+      for (int i = 0; i < 20; i++) {
+         builder.append("The quick red fox jumped over the lazy brown dog. ");
+      }
+      TEXT = builder.toString();
+   }
+
+   @Before
+   @Override
+   public void setUp() throws Exception {
+      factory.setUseCompression(true);
+      super.setUp();
+      connection.start();
+      assertTrue(connection.isUseCompression());
+   }
+
+   @Test
+   public void testCoreReceiveOpenWireCompressedMessages() throws Exception {
+      //TextMessage
+      sendCompressedTextMessageUsingOpenWire();
+      receiveTextMessageUsingCore();
+      //BytesMessage
+      sendCompressedBytesMessageUsingOpenWire();
+      receiveBytesMessageUsingCore();
+      //MapMessage
+      sendCompressedMapMessageUsingOpenWire();
+      receiveMapMessageUsingCore();
+      //StreamMessage
+      sendCompressedStreamMessageUsingOpenWire();
+      receiveStreamMessageUsingCore();
+      //ObjectMessage
+      sendCompressedObjectMessageUsingOpenWire();
+      receiveObjectMessageUsingCore();
+   }
+
+   private void sendCompressedStreamMessageUsingOpenWire() throws Exception {
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      ActiveMQDestination destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
+
+      final ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination);
+
+      StreamMessage streamMessage = session.createStreamMessage();
+
+      streamMessage.writeBoolean(true);
+      streamMessage.writeByte((byte) 10);
+      streamMessage.writeBytes(TEXT.getBytes());
+      streamMessage.writeChar('A');
+      streamMessage.writeDouble(55.3D);
+      streamMessage.writeFloat(79.1F);
+      streamMessage.writeInt(37);
+      streamMessage.writeLong(56652L);
+      streamMessage.writeObject(new String("VVVV"));
+      streamMessage.writeShort((short) 333);
+      streamMessage.writeString(TEXT);
+
+      producer.send(streamMessage);
+   }
+
+   private void receiveStreamMessageUsingCore() throws Exception {
+      StreamMessage streamMessage = (StreamMessage) receiveMessageUsingCore();
+      boolean booleanVal = streamMessage.readBoolean();
+      assertTrue(booleanVal);
+      byte byteVal = streamMessage.readByte();
+      assertEquals((byte)10, byteVal);
+      byte[] originVal = TEXT.getBytes();
+      byte[] bytesVal = new byte[originVal.length];
+      streamMessage.readBytes(bytesVal);
+      for (int i = 0; i < bytesVal.length; i++) {
+         assertTrue(bytesVal[i] == originVal[i]);
+      }
+      char charVal = streamMessage.readChar();
+      assertEquals('A', charVal);
+      double doubleVal = streamMessage.readDouble();
+      assertEquals(55.3D, doubleVal, 0.1D);
+      float floatVal = streamMessage.readFloat();
+      assertEquals(79.1F, floatVal, 0.1F);
+      int intVal = streamMessage.readInt();
+      assertEquals(37, intVal);
+      long longVal = streamMessage.readLong();
+      assertEquals(56652L, longVal);
+      Object objectVal = streamMessage.readObject();
+      Object origVal = new String("VVVV");
+      assertTrue(objectVal.equals(origVal));
+      short shortVal = streamMessage.readShort();
+      assertEquals((short) 333, shortVal);
+      String strVal = streamMessage.readString();
+      assertEquals(TEXT, strVal);
+   }
+
+   private void sendCompressedObjectMessageUsingOpenWire() throws Exception {
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      ActiveMQDestination destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
+
+      final ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination);
+
+      ObjectMessage objectMessage = session.createObjectMessage();
+      objectMessage.setObject(TEXT);
+
+      producer.send(objectMessage);
+   }
+
+   private void receiveObjectMessageUsingCore() throws Exception {
+      ObjectMessage objectMessage = (ObjectMessage) receiveMessageUsingCore();
+      Object objectVal = objectMessage.getObject();
+      assertEquals(TEXT, objectVal);
+   }
+
+   private void sendCompressedMapMessageUsingOpenWire() throws Exception {
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      ActiveMQDestination destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
+
+      final ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination);
+
+      MapMessage mapMessage = session.createMapMessage();
+
+      mapMessage.setBoolean("boolean-type", true);
+      mapMessage.setByte("byte-type", (byte) 10);
+      mapMessage.setBytes("bytes-type", TEXT.getBytes());
+      mapMessage.setChar("char-type", 'A');
+      mapMessage.setDouble("double-type", 55.3D);
+      mapMessage.setFloat("float-type", 79.1F);
+      mapMessage.setInt("int-type", 37);
+      mapMessage.setLong("long-type", 56652L);
+      mapMessage.setObject("object-type", new String("VVVV"));
+      mapMessage.setShort("short-type", (short) 333);
+      mapMessage.setString("string-type", TEXT);
+
+      producer.send(mapMessage);
+   }
+
+   private void receiveMapMessageUsingCore() throws Exception {
+      MapMessage mapMessage = (MapMessage) receiveMessageUsingCore();
+
+      boolean booleanVal = mapMessage.getBoolean("boolean-type");
+      assertTrue(booleanVal);
+      byte byteVal = mapMessage.getByte("byte-type");
+      assertEquals((byte)10, byteVal);
+      byte[] bytesVal = mapMessage.getBytes("bytes-type");
+      byte[] originVal = TEXT.getBytes();
+      assertEquals(originVal.length, bytesVal.length);
+      for (int i = 0; i < bytesVal.length; i++) {
+         assertTrue(bytesVal[i] == originVal[i]);
+      }
+      char charVal = mapMessage.getChar("char-type");
+      assertEquals('A', charVal);
+      double doubleVal = mapMessage.getDouble("double-type");
+      assertEquals(55.3D, doubleVal, 0.1D);
+      float floatVal = mapMessage.getFloat("float-type");
+      assertEquals(79.1F, floatVal, 0.1F);
+      int intVal = mapMessage.getInt("int-type");
+      assertEquals(37, intVal);
+      long longVal = mapMessage.getLong("long-type");
+      assertEquals(56652L, longVal);
+      Object objectVal = mapMessage.getObject("object-type");
+      Object origVal = new String("VVVV");
+      assertTrue(objectVal.equals(origVal));
+      short shortVal = mapMessage.getShort("short-type");
+      assertEquals((short) 333, shortVal);
+      String strVal = mapMessage.getString("string-type");
+      assertEquals(TEXT, strVal);
+   }
+
+   private void sendCompressedBytesMessageUsingOpenWire() throws Exception {
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      ActiveMQDestination destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
+
+      final ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination);
+
+      BytesMessage bytesMessage = session.createBytesMessage();
+      bytesMessage.writeBytes(TEXT.getBytes());
+
+      producer.send(bytesMessage);
+   }
+
+   private void receiveBytesMessageUsingCore() throws Exception {
+      BytesMessage bytesMessage = (BytesMessage) receiveMessageUsingCore();
+
+      byte[] bytes = new byte[TEXT.getBytes("UTF8").length];
+      bytesMessage.readBytes(bytes);
+      assertTrue(bytesMessage.readBytes(new byte[255]) == -1);
+
+      String rcvString = new String(bytes, "UTF8");
+      assertEquals(TEXT, rcvString);
+   }
+
+   private void receiveTextMessageUsingCore() throws Exception {
+      TextMessage txtMessage = (TextMessage) receiveMessageUsingCore();
+      assertEquals(TEXT, txtMessage.getText());
+   }
+
+   private Message receiveMessageUsingCore() throws Exception {
+      Connection jmsConn = null;
+      Message message = null;
+      try {
+         jmsConn = coreCf.createConnection();
+         jmsConn.start();
+
+         Session session = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Queue queue = session.createQueue(this.queueName);
+         MessageConsumer coreConsumer = session.createConsumer(queue);
+
+         message = coreConsumer.receive(5000);
+      }
+      finally {
+         if (jmsConn != null) {
+            jmsConn.close();
+         }
+      }
+      return message;
+   }
+
+   private void sendCompressedTextMessageUsingOpenWire() throws Exception {
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      ActiveMQDestination destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
+
+      final ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination);
+
+      TextMessage textMessage = session.createTextMessage(TEXT);
+
+      producer.send(textMessage);
+   }
+
+}


[2/2] activemq-artemis git commit: This closes #163

Posted by cl...@apache.org.
This closes #163


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/cbb6c63d
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/cbb6c63d
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/cbb6c63d

Branch: refs/heads/master
Commit: cbb6c63d0063efb2afb6fbf2ddbc533a02425091
Parents: 6408fd0 0abf524
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon Sep 14 08:54:01 2015 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Sep 14 08:54:01 2015 -0400

----------------------------------------------------------------------
 .../openwire/OpenWireMessageConverter.java      | 177 +++++++++++--
 .../command/MessageCompressionTest.java         | 207 +++++++++++++++
 .../openwire/interop/CompressedInteropTest.java | 263 +++++++++++++++++++
 3 files changed, 631 insertions(+), 16 deletions(-)
----------------------------------------------------------------------