You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/01/22 23:06:06 UTC

[02/10] activemq-artemis git commit: ARTEMIS-1616 OpenWire improvements

ARTEMIS-1616 OpenWire improvements

Refactored OpenWireMessageConverter::toAMQMessage into smaller methods


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c6b6dd95
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c6b6dd95
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c6b6dd95

Branch: refs/heads/master
Commit: c6b6dd95d1665230d667557df240d8a62a2118af
Parents: e7a1dca
Author: Francesco Nigro <ni...@gmail.com>
Authored: Wed Jan 17 14:37:08 2018 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Jan 22 18:02:03 2018 -0500

----------------------------------------------------------------------
 .../openwire/OpenWireMessageConverter.java      | 562 +++++++++++--------
 1 file changed, 316 insertions(+), 246 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c6b6dd95/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 0948f8a..3dc4a4e 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -120,7 +120,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
       return null;
    }
 
-//   @Override
+   //   @Override
    public org.apache.activemq.artemis.api.core.Message inbound(Object message, CoreMessageObjectPools coreMessageObjectPools) throws Exception {
 
       final Message messageSend = (Message) message;
@@ -205,7 +205,9 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
 
       final ProducerId producerId = messageSend.getProducerId();
       if (producerId != null) {
-         putMsgProducerId(producerId, marshaller, coreMessage);
+         final ByteSequence producerIdBytes = marshaller.marshal(producerId);
+         producerIdBytes.compact();
+         coreMessage.putBytesProperty(AMQ_MSG_PRODUCER_ID, producerIdBytes.data);
       }
       final ByteSequence propBytes = messageSend.getMarshalledProperties();
       if (propBytes != null) {
@@ -437,14 +439,6 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
       coreMessage.putBytesProperty(AMQ_MSG_DATASTRUCTURE, dsBytes.data);
    }
 
-   private static void putMsgProducerId(final ProducerId producerId,
-                                        final WireFormat marshaller,
-                                        final CoreMessage coreMessage) throws IOException {
-      final ByteSequence producerIdBytes = marshaller.marshal(producerId);
-      producerIdBytes.compact();
-      coreMessage.putBytesProperty(AMQ_MSG_PRODUCER_ID, producerIdBytes.data);
-   }
-
    private static void putMsgMarshalledProperties(final ByteSequence propBytes,
                                                   final Message messageSend,
                                                   final CoreMessage coreMessage) throws IOException {
@@ -512,9 +506,9 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
    }
 
    public MessageDispatch createMessageDispatch(MessageReference reference,
-                                                       ICoreMessage message,
-                                                       AMQConsumer consumer) throws IOException, JMSException {
-      ActiveMQMessage amqMessage = toAMQMessage(reference, message, consumer);
+                                                ICoreMessage message,
+                                                AMQConsumer consumer) throws IOException, JMSException {
+      ActiveMQMessage amqMessage = toAMQMessage(reference, message, marshaller, consumer);
 
       //we can use core message id for sequenceId
       amqMessage.getMessageId().setBrokerSequenceId(message.getMessageID());
@@ -529,35 +523,48 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
       return md;
    }
 
-   private ActiveMQMessage toAMQMessage(MessageReference reference,
+   private static ActiveMQMessage toAMQMessage(MessageReference reference,
                                                ICoreMessage coreMessage,
+                                               WireFormat marshaller,
                                                AMQConsumer consumer) throws IOException {
-      ActiveMQMessage amqMsg = null;
-      byte coreType = coreMessage.getType();
+      final ActiveMQMessage amqMsg;
+      final byte coreType = coreMessage.getType();
+      final Boolean compressProp = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_COMPRESSED);
+      final boolean isCompressed = compressProp == null ? false : compressProp.booleanValue();
+      final byte[] bytes;
+      final ActiveMQBuffer buffer = coreMessage.getReadOnlyBodyBuffer();
+      buffer.resetReaderIndex();
+
       switch (coreType) {
          case org.apache.activemq.artemis.api.core.Message.BYTES_TYPE:
             amqMsg = new ActiveMQBytesMessage();
+            bytes = toAMQMessageBytesType(buffer, isCompressed);
             break;
          case org.apache.activemq.artemis.api.core.Message.MAP_TYPE:
             amqMsg = new ActiveMQMapMessage();
+            bytes = toAMQMessageMapType(buffer, isCompressed);
             break;
          case org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE:
             amqMsg = new ActiveMQObjectMessage();
+            bytes = toAMQMessageObjectType(buffer, isCompressed);
             break;
          case org.apache.activemq.artemis.api.core.Message.STREAM_TYPE:
             amqMsg = new ActiveMQStreamMessage();
+            bytes = toAMQMessageStreamType(buffer, isCompressed);
             break;
          case org.apache.activemq.artemis.api.core.Message.TEXT_TYPE:
             amqMsg = new ActiveMQTextMessage();
+            bytes = toAMQMessageTextType(buffer, isCompressed);
             break;
          case org.apache.activemq.artemis.api.core.Message.DEFAULT_TYPE:
             amqMsg = new ActiveMQMessage();
+            bytes = toAMQMessageDefaultType(buffer, isCompressed);
             break;
          default:
             throw new IllegalStateException("Unknown message type: " + coreMessage.getType());
       }
 
-      String type = coreMessage.getStringProperty(JMS_TYPE_PROPERTY);
+      final String type = coreMessage.getStringProperty(JMS_TYPE_PROPERTY);
       if (type != null) {
          amqMsg.setJMSType(type);
       }
@@ -572,165 +579,8 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
       }
       amqMsg.setBrokerInTime(brokerInTime);
 
-      ActiveMQBuffer buffer = coreMessage.getReadOnlyBodyBuffer();
-      Boolean compressProp = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_COMPRESSED);
-      boolean isCompressed = compressProp == null ? false : compressProp.booleanValue();
       amqMsg.setCompressed(isCompressed);
 
-      byte[] bytes = null;
-      if (buffer != null) {
-         buffer.resetReaderIndex();
-
-         if (coreType == org.apache.activemq.artemis.api.core.Message.TEXT_TYPE) {
-            SimpleString text = buffer.readNullableSimpleString();
-            if (text != null) {
-               ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(text.length() + 4);
-               OutputStream out = bytesOut;
-               if (isCompressed) {
-                  out = new DeflaterOutputStream(out, true);
-               }
-               try (DataOutputStream dataOut = new DataOutputStream(out)) {
-                  MarshallingSupport.writeUTF8(dataOut, text.toString());
-                  dataOut.flush();
-                  bytes = bytesOut.toByteArray();
-               }
-            }
-         } else if (coreType == org.apache.activemq.artemis.api.core.Message.MAP_TYPE) {
-            TypedProperties mapData = new TypedProperties();
-            //it could be a null map
-            if (buffer.readableBytes() > 0) {
-               mapData.decode(buffer.byteBuf());
-               Map<String, Object> map = mapData.getMap();
-               ByteArrayOutputStream out = new ByteArrayOutputStream(mapData.getEncodeSize());
-               OutputStream os = out;
-               if (isCompressed) {
-                  os = new DeflaterOutputStream(os, true);
-               }
-               try (DataOutputStream dataOut = new DataOutputStream(os)) {
-                  MarshallingSupport.marshalPrimitiveMap(map, dataOut);
-                  dataOut.flush();
-               }
-               bytes = out.toByteArray();
-            }
-
-         } else if (coreType == org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE) {
-            if (buffer.readableBytes() > 0) {
-               int len = buffer.readInt();
-               bytes = new byte[len];
-               buffer.readBytes(bytes);
-               if (isCompressed) {
-                  ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
-                  try (DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) {
-                     out.write(bytes);
-                     out.flush();
-                  }
-                  bytes = bytesOut.toByteArray();
-               }
-            }
-         } else if (coreType == org.apache.activemq.artemis.api.core.Message.STREAM_TYPE) {
-            org.apache.activemq.util.ByteArrayOutputStream bytesOut = new org.apache.activemq.util.ByteArrayOutputStream();
-            OutputStream out = bytesOut;
-            if (isCompressed) {
-               out = new DeflaterOutputStream(bytesOut, true);
-            }
-            try (DataOutputStream dataOut = new DataOutputStream(out)) {
-
-               boolean stop = false;
-               while (!stop && buffer.readable()) {
-                  byte primitiveType = buffer.readByte();
-                  switch (primitiveType) {
-                     case DataConstants.BOOLEAN:
-                        MarshallingSupport.marshalBoolean(dataOut, buffer.readBoolean());
-                        break;
-                     case DataConstants.BYTE:
-                        MarshallingSupport.marshalByte(dataOut, buffer.readByte());
-                        break;
-                     case DataConstants.BYTES:
-                        int len = buffer.readInt();
-                        byte[] bytesData = new byte[len];
-                        buffer.readBytes(bytesData);
-                        MarshallingSupport.marshalByteArray(dataOut, bytesData);
-                        break;
-                     case DataConstants.CHAR:
-                        char ch = (char) buffer.readShort();
-                        MarshallingSupport.marshalChar(dataOut, ch);
-                        break;
-                     case DataConstants.DOUBLE:
-                        double doubleVal = Double.longBitsToDouble(buffer.readLong());
-                        MarshallingSupport.marshalDouble(dataOut, doubleVal);
-                        break;
-                     case DataConstants.FLOAT:
-                        Float floatVal = Float.intBitsToFloat(buffer.readInt());
-                        MarshallingSupport.marshalFloat(dataOut, floatVal);
-                        break;
-                     case DataConstants.INT:
-                        MarshallingSupport.marshalInt(dataOut, buffer.readInt());
-                        break;
-                     case DataConstants.LONG:
-                        MarshallingSupport.marshalLong(dataOut, buffer.readLong());
-                        break;
-                     case DataConstants.SHORT:
-                        MarshallingSupport.marshalShort(dataOut, buffer.readShort());
-                        break;
-                     case DataConstants.STRING:
-                        String string = buffer.readNullableString();
-                        if (string == null) {
-                           MarshallingSupport.marshalNull(dataOut);
-                        } else {
-                           MarshallingSupport.marshalString(dataOut, string);
-                        }
-                        break;
-                     default:
-                        //now we stop
-                        stop = true;
-                        break;
-                  }
-                  dataOut.flush();
-               }
-            }
-            bytes = bytesOut.toByteArray();
-         } else if (coreType == org.apache.activemq.artemis.api.core.Message.BYTES_TYPE) {
-            int n = buffer.readableBytes();
-            bytes = new byte[n];
-            buffer.readBytes(bytes);
-            if (isCompressed) {
-               int length = bytes.length;
-               Deflater deflater = new Deflater();
-               try (org.apache.activemq.util.ByteArrayOutputStream compressed = new org.apache.activemq.util.ByteArrayOutputStream()) {
-                  compressed.write(new byte[4]);
-                  deflater.setInput(bytes);
-                  deflater.finish();
-                  byte[] bytesBuf = new byte[1024];
-                  while (!deflater.finished()) {
-                     int count = deflater.deflate(bytesBuf);
-                     compressed.write(bytesBuf, 0, count);
-                  }
-                  compressed.flush();
-                  ByteSequence byteSeq = compressed.toByteSequence();
-                  ByteSequenceData.writeIntBig(byteSeq, length);
-                  bytes = Arrays.copyOfRange(byteSeq.data, 0, byteSeq.length);
-               } finally {
-                  deflater.end();
-               }
-            }
-         } else {
-            int n = buffer.readableBytes();
-            bytes = new byte[n];
-            buffer.readBytes(bytes);
-            if (isCompressed) {
-               try (ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
-                    DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) {
-                  out.write(bytes);
-                  out.flush();
-                  bytes = bytesOut.toByteArray();
-               }
-            }
-         }
-
-         buffer.resetReaderIndex();// this is important for topics as the buffer
-         // may be read multiple times
-      }
-
       //we need check null because messages may come from other clients
       //and those amq specific attribute may not be set.
       Long arrival = (Long) coreMessage.getObjectProperty(AMQ_MSG_ARRIVAL);
@@ -740,24 +590,14 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
       }
       amqMsg.setArrival(arrival);
 
-      String brokerPath = (String) coreMessage.getObjectProperty(AMQ_MSG_BROKER_PATH);
-      if (brokerPath != null && brokerPath.isEmpty()) {
-         String[] brokers = brokerPath.split(",");
-         BrokerId[] bids = new BrokerId[brokers.length];
-         for (int i = 0; i < bids.length; i++) {
-            bids[i] = new BrokerId(brokers[i]);
-         }
-         amqMsg.setBrokerPath(bids);
+      final String brokerPath = (String) coreMessage.getObjectProperty(AMQ_MSG_BROKER_PATH);
+      if (brokerPath != null && !brokerPath.isEmpty()) {
+         setAMQMsgBrokerPath(amqMsg, brokerPath);
       }
 
-      String clusterPath = (String) coreMessage.getObjectProperty(AMQ_MSG_CLUSTER);
-      if (clusterPath != null && clusterPath.isEmpty()) {
-         String[] cluster = clusterPath.split(",");
-         BrokerId[] bids = new BrokerId[cluster.length];
-         for (int i = 0; i < bids.length; i++) {
-            bids[i] = new BrokerId(cluster[i]);
-         }
-         amqMsg.setCluster(bids);
+      final String clusterPath = (String) coreMessage.getObjectProperty(AMQ_MSG_CLUSTER);
+      if (clusterPath != null && !clusterPath.isEmpty()) {
+         setAMQMsgClusterPath(amqMsg, clusterPath);
       }
 
       Integer commandId = (Integer) coreMessage.getObjectProperty(AMQ_MSG_COMMAND_ID);
@@ -766,21 +606,19 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
       }
       amqMsg.setCommandId(commandId);
 
-      SimpleString corrId = (SimpleString) coreMessage.getObjectProperty(JMS_CORRELATION_ID_PROPERTY);
+      final SimpleString corrId = (SimpleString) coreMessage.getObjectProperty(JMS_CORRELATION_ID_PROPERTY);
       if (corrId != null) {
          amqMsg.setCorrelationId(corrId.toString());
       }
 
-      byte[] dsBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_DATASTRUCTURE);
+      final byte[] dsBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_DATASTRUCTURE);
       if (dsBytes != null) {
-         ByteSequence seq = new ByteSequence(dsBytes);
-         DataStructure ds = (DataStructure) marshaller.unmarshal(seq);
-         amqMsg.setDataStructure(ds);
+         setAMQMsgDataStructure(amqMsg, marshaller, dsBytes);
       }
       final ActiveMQDestination actualDestination = consumer.getOpenwireDestination();
       amqMsg.setDestination(OpenWireUtil.toAMQAddress(coreMessage, actualDestination));
 
-      Object value = coreMessage.getGroupID();
+      final Object value = coreMessage.getGroupID();
       if (value != null) {
          String groupId = value.toString();
          amqMsg.setGroupID(groupId);
@@ -792,8 +630,8 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
       }
       amqMsg.setGroupSequence(groupSequence);
 
-      MessageId mid = null;
-      byte[] midBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_MESSAGE_ID);
+      final MessageId mid;
+      final byte[] midBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_MESSAGE_ID);
       if (midBytes != null) {
          ByteSequence midSeq = new ByteSequence(midBytes);
          mid = (MessageId) marshaller.unmarshal(midSeq);
@@ -803,97 +641,329 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
 
       amqMsg.setMessageId(mid);
 
-      byte[] origDestBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_ORIG_DESTINATION);
+      final byte[] origDestBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_ORIG_DESTINATION);
       if (origDestBytes != null) {
-         ActiveMQDestination origDest = (ActiveMQDestination) marshaller.unmarshal(new ByteSequence(origDestBytes));
-         amqMsg.setOriginalDestination(origDest);
+         setAMQMsgOriginalDestination(amqMsg, marshaller, origDestBytes);
       }
 
-      byte[] origTxIdBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_ORIG_TXID);
+      final byte[] origTxIdBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_ORIG_TXID);
       if (origTxIdBytes != null) {
-         TransactionId origTxId = (TransactionId) marshaller.unmarshal(new ByteSequence(origTxIdBytes));
-         amqMsg.setOriginalTransactionId(origTxId);
+         setAMQMsgOriginalTransactionId(amqMsg, marshaller, origTxIdBytes);
       }
 
-      byte[] producerIdBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_PRODUCER_ID);
+      final byte[] producerIdBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_PRODUCER_ID);
       if (producerIdBytes != null) {
          ProducerId producerId = (ProducerId) marshaller.unmarshal(new ByteSequence(producerIdBytes));
          amqMsg.setProducerId(producerId);
       }
 
-      byte[] marshalledBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_MARSHALL_PROP);
+      final byte[] marshalledBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_MARSHALL_PROP);
       if (marshalledBytes != null) {
          amqMsg.setMarshalledProperties(new ByteSequence(marshalledBytes));
       }
 
       amqMsg.setRedeliveryCounter(reference.getDeliveryCount() - 1);
 
-      byte[] replyToBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_REPLY_TO);
+      final byte[] replyToBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_REPLY_TO);
       if (replyToBytes != null) {
-         ActiveMQDestination replyTo = (ActiveMQDestination) marshaller.unmarshal(new ByteSequence(replyToBytes));
-         amqMsg.setReplyTo(replyTo);
+         setAMQMsgReplyTo(amqMsg, marshaller, replyToBytes);
       }
 
-      String userId = (String) coreMessage.getObjectProperty(AMQ_MSG_USER_ID);
+      final String userId = (String) coreMessage.getObjectProperty(AMQ_MSG_USER_ID);
       if (userId != null) {
          amqMsg.setUserID(userId);
       }
 
-      Boolean isDroppable = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_DROPPABLE);
+      final Boolean isDroppable = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_DROPPABLE);
       if (isDroppable != null) {
          amqMsg.setDroppable(isDroppable);
       }
 
-      SimpleString dlqCause = (SimpleString) coreMessage.getObjectProperty(AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
+      final SimpleString dlqCause = (SimpleString) coreMessage.getObjectProperty(AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
       if (dlqCause != null) {
-         try {
-            amqMsg.setStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, dlqCause.toString());
-         } catch (JMSException e) {
-            throw new IOException("failure to set dlq property " + dlqCause, e);
-         }
+         setAMQMsgDlqDeliveryFailureCause(amqMsg, dlqCause);
       }
 
-      SimpleString lastValueProperty = coreMessage.getLastValueProperty();
+      final SimpleString lastValueProperty = coreMessage.getLastValueProperty();
       if (lastValueProperty != null) {
-         try {
-            amqMsg.setStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_LAST_VALUE_NAME.toString(), lastValueProperty.toString());
-         } catch (JMSException e) {
-            throw new IOException("failure to set lvq property " + dlqCause, e);
-         }
+         setAMQMsgHdrLastValueName(amqMsg, lastValueProperty);
       }
 
-      Set<SimpleString> props = coreMessage.getPropertyNames();
+      final Set<SimpleString> props = coreMessage.getPropertyNames();
       if (props != null) {
-         for (SimpleString s : props) {
-            String keyStr = s.toString();
-            if ((keyStr.startsWith("_AMQ") || keyStr.startsWith("__HDR_")) &&
-                    !consumer.hasNotificationDestination()) {
-               continue;
+         setAMQMsgObjectProperties(amqMsg, coreMessage, props, consumer);
+      }
+
+      if (bytes != null) {
+         ByteSequence content = new ByteSequence(bytes);
+         amqMsg.setContent(content);
+      }
+      return amqMsg;
+   }
+
+   private static byte[] toAMQMessageTextType(final ActiveMQBuffer buffer,
+                                              final boolean isCompressed) throws IOException {
+      byte[] bytes = null;
+      SimpleString text = buffer.readNullableSimpleString();
+      if (text != null) {
+         ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(text.length() + 4);
+         OutputStream out = bytesOut;
+         if (isCompressed) {
+            out = new DeflaterOutputStream(out, true);
+         }
+         try (DataOutputStream dataOut = new DataOutputStream(out)) {
+            MarshallingSupport.writeUTF8(dataOut, text.toString());
+            dataOut.flush();
+            bytes = bytesOut.toByteArray();
+         }
+      }
+      return bytes;
+   }
+
+   private static byte[] toAMQMessageMapType(final ActiveMQBuffer buffer,
+                                             final boolean isCompressed) throws IOException {
+      byte[] bytes = null;
+      //it could be a null map
+      if (buffer.readableBytes() > 0) {
+         TypedProperties mapData = new TypedProperties();
+         mapData.decode(buffer.byteBuf());
+         Map<String, Object> map = mapData.getMap();
+         ByteArrayOutputStream out = new ByteArrayOutputStream(mapData.getEncodeSize());
+         OutputStream os = out;
+         if (isCompressed) {
+            os = new DeflaterOutputStream(os, true);
+         }
+         try (DataOutputStream dataOut = new DataOutputStream(os)) {
+            MarshallingSupport.marshalPrimitiveMap(map, dataOut);
+            dataOut.flush();
+         }
+         bytes = out.toByteArray();
+      }
+      return bytes;
+   }
+
+   private static byte[] toAMQMessageObjectType(final ActiveMQBuffer buffer,
+                                                final boolean isCompressed) throws IOException {
+      byte[] bytes = null;
+      if (buffer.readableBytes() > 0) {
+         int len = buffer.readInt();
+         bytes = new byte[len];
+         buffer.readBytes(bytes);
+         if (isCompressed) {
+            ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+            try (DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) {
+               out.write(bytes);
+               out.flush();
             }
-            Object prop = coreMessage.getObjectProperty(s);
-            try {
-               if (prop instanceof SimpleString) {
-                  amqMsg.setObjectProperty(s.toString(), prop.toString());
-               } else {
-                  if (keyStr.equals(MessageUtil.JMSXDELIVERYCOUNT) && prop instanceof Long) {
-                     Long l = (Long) prop;
-                     amqMsg.setObjectProperty(s.toString(), l.intValue());
+            bytes = bytesOut.toByteArray();
+         }
+      }
+      return bytes;
+   }
+
+   private static byte[] toAMQMessageStreamType(final ActiveMQBuffer buffer,
+                                                final boolean isCompressed) throws IOException {
+      byte[] bytes;
+      org.apache.activemq.util.ByteArrayOutputStream bytesOut = new org.apache.activemq.util.ByteArrayOutputStream();
+      OutputStream out = bytesOut;
+      if (isCompressed) {
+         out = new DeflaterOutputStream(bytesOut, true);
+      }
+      try (DataOutputStream dataOut = new DataOutputStream(out)) {
+
+         boolean stop = false;
+         while (!stop && buffer.readable()) {
+            byte primitiveType = buffer.readByte();
+            switch (primitiveType) {
+               case DataConstants.BOOLEAN:
+                  MarshallingSupport.marshalBoolean(dataOut, buffer.readBoolean());
+                  break;
+               case DataConstants.BYTE:
+                  MarshallingSupport.marshalByte(dataOut, buffer.readByte());
+                  break;
+               case DataConstants.BYTES:
+                  int len = buffer.readInt();
+                  byte[] bytesData = new byte[len];
+                  buffer.readBytes(bytesData);
+                  MarshallingSupport.marshalByteArray(dataOut, bytesData);
+                  break;
+               case DataConstants.CHAR:
+                  char ch = (char) buffer.readShort();
+                  MarshallingSupport.marshalChar(dataOut, ch);
+                  break;
+               case DataConstants.DOUBLE:
+                  double doubleVal = Double.longBitsToDouble(buffer.readLong());
+                  MarshallingSupport.marshalDouble(dataOut, doubleVal);
+                  break;
+               case DataConstants.FLOAT:
+                  Float floatVal = Float.intBitsToFloat(buffer.readInt());
+                  MarshallingSupport.marshalFloat(dataOut, floatVal);
+                  break;
+               case DataConstants.INT:
+                  MarshallingSupport.marshalInt(dataOut, buffer.readInt());
+                  break;
+               case DataConstants.LONG:
+                  MarshallingSupport.marshalLong(dataOut, buffer.readLong());
+                  break;
+               case DataConstants.SHORT:
+                  MarshallingSupport.marshalShort(dataOut, buffer.readShort());
+                  break;
+               case DataConstants.STRING:
+                  String string = buffer.readNullableString();
+                  if (string == null) {
+                     MarshallingSupport.marshalNull(dataOut);
                   } else {
-                     amqMsg.setObjectProperty(s.toString(), prop);
+                     MarshallingSupport.marshalString(dataOut, string);
                   }
-               }
-            } catch (JMSException e) {
-               throw new IOException("exception setting property " + s + " : " + prop, e);
+                  break;
+               default:
+                  //now we stop
+                  stop = true;
+                  break;
             }
+            dataOut.flush();
          }
       }
+      bytes = bytesOut.toByteArray();
+      return bytes;
+   }
 
-      amqMsg.setCompressed(isCompressed);
-      if (bytes != null) {
-         ByteSequence content = new ByteSequence(bytes);
-         amqMsg.setContent(content);
+   private static byte[] toAMQMessageBytesType(final ActiveMQBuffer buffer,
+                                               final boolean isCompressed) throws IOException {
+      int n = buffer.readableBytes();
+      byte[] bytes = new byte[n];
+      buffer.readBytes(bytes);
+      if (isCompressed) {
+         bytes = toAMQMessageCompressedBytesType(bytes);
+      }
+      return bytes;
+   }
+
+   private static byte[] toAMQMessageCompressedBytesType(final byte[] bytes) throws IOException {
+      int length = bytes.length;
+      Deflater deflater = new Deflater();
+      try (org.apache.activemq.util.ByteArrayOutputStream compressed = new org.apache.activemq.util.ByteArrayOutputStream()) {
+         compressed.write(new byte[4]);
+         deflater.setInput(bytes);
+         deflater.finish();
+         byte[] bytesBuf = new byte[1024];
+         while (!deflater.finished()) {
+            int count = deflater.deflate(bytesBuf);
+            compressed.write(bytesBuf, 0, count);
+         }
+         compressed.flush();
+         ByteSequence byteSeq = compressed.toByteSequence();
+         ByteSequenceData.writeIntBig(byteSeq, length);
+         return Arrays.copyOfRange(byteSeq.data, 0, byteSeq.length);
+      } finally {
+         deflater.end();
       }
-      return amqMsg;
    }
 
+   private static byte[] toAMQMessageDefaultType(final ActiveMQBuffer buffer,
+                                                 final boolean isCompressed) throws IOException {
+      int n = buffer.readableBytes();
+      byte[] bytes = new byte[n];
+      buffer.readBytes(bytes);
+      if (isCompressed) {
+         try (ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) {
+            out.write(bytes);
+            out.flush();
+            bytes = bytesOut.toByteArray();
+         }
+      }
+      return bytes;
+   }
+
+   private static void setAMQMsgBrokerPath(final ActiveMQMessage amqMsg, final String brokerPath) {
+      String[] brokers = brokerPath.split(",");
+      BrokerId[] bids = new BrokerId[brokers.length];
+      for (int i = 0; i < bids.length; i++) {
+         bids[i] = new BrokerId(brokers[i]);
+      }
+      amqMsg.setBrokerPath(bids);
+   }
+
+   private static void setAMQMsgClusterPath(final ActiveMQMessage amqMsg, final String clusterPath) {
+      String[] cluster = clusterPath.split(",");
+      BrokerId[] bids = new BrokerId[cluster.length];
+      for (int i = 0; i < bids.length; i++) {
+         bids[i] = new BrokerId(cluster[i]);
+      }
+      amqMsg.setCluster(bids);
+   }
+
+   private static void setAMQMsgDataStructure(final ActiveMQMessage amqMsg,
+                                              final WireFormat marshaller,
+                                              final byte[] dsBytes) throws IOException {
+      ByteSequence seq = new ByteSequence(dsBytes);
+      DataStructure ds = (DataStructure) marshaller.unmarshal(seq);
+      amqMsg.setDataStructure(ds);
+   }
+
+   private static void setAMQMsgOriginalDestination(final ActiveMQMessage amqMsg,
+                                                    final WireFormat marshaller,
+                                                    final byte[] origDestBytes) throws IOException {
+      ActiveMQDestination origDest = (ActiveMQDestination) marshaller.unmarshal(new ByteSequence(origDestBytes));
+      amqMsg.setOriginalDestination(origDest);
+   }
+
+   private static void setAMQMsgOriginalTransactionId(final ActiveMQMessage amqMsg,
+                                                      final WireFormat marshaller,
+                                                      final byte[] origTxIdBytes) throws IOException {
+      TransactionId origTxId = (TransactionId) marshaller.unmarshal(new ByteSequence(origTxIdBytes));
+      amqMsg.setOriginalTransactionId(origTxId);
+   }
+
+   private static void setAMQMsgReplyTo(final ActiveMQMessage amqMsg,
+                                        final WireFormat marshaller,
+                                        final byte[] replyToBytes) throws IOException {
+      ActiveMQDestination replyTo = (ActiveMQDestination) marshaller.unmarshal(new ByteSequence(replyToBytes));
+      amqMsg.setReplyTo(replyTo);
+   }
+
+   private static void setAMQMsgDlqDeliveryFailureCause(final ActiveMQMessage amqMsg,
+                                                        final SimpleString dlqCause) throws IOException {
+      try {
+         amqMsg.setStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, dlqCause.toString());
+      } catch (JMSException e) {
+         throw new IOException("failure to set dlq property " + dlqCause, e);
+      }
+   }
+
+   private static void setAMQMsgHdrLastValueName(final ActiveMQMessage amqMsg,
+                                                 final SimpleString lastValueProperty) throws IOException {
+      try {
+         amqMsg.setStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_LAST_VALUE_NAME.toString(), lastValueProperty.toString());
+      } catch (JMSException e) {
+         throw new IOException("failure to set lvq property " + lastValueProperty, e);
+      }
+   }
+
+   private static void setAMQMsgObjectProperties(final ActiveMQMessage amqMsg,
+                                                 final ICoreMessage coreMessage,
+                                                 final Set<SimpleString> props,
+                                                 final AMQConsumer consumer) throws IOException {
+      for (SimpleString s : props) {
+         final String keyStr = s.toString();
+         if (!consumer.hasNotificationDestination() && (keyStr.startsWith("_AMQ") || keyStr.startsWith("__HDR_"))) {
+            continue;
+         }
+         final Object prop = coreMessage.getObjectProperty(s);
+         try {
+            if (prop instanceof SimpleString) {
+               amqMsg.setObjectProperty(keyStr, prop.toString());
+            } else {
+               if (keyStr.equals(MessageUtil.JMSXDELIVERYCOUNT) && prop instanceof Long) {
+                  Long l = (Long) prop;
+                  amqMsg.setObjectProperty(keyStr, l.intValue());
+               } else {
+                  amqMsg.setObjectProperty(keyStr, prop);
+               }
+            }
+         } catch (JMSException e) {
+            throw new IOException("exception setting property " + s + " : " + prop, e);
+         }
+      }
+   }
 }