You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/01/22 23:06:09 UTC

[05/10] activemq-artemis git commit: ARTEMIS-1616 OpenWire improvements

ARTEMIS-1616 OpenWire improvements

Refactored OpenWireMessageConverter::inbound into smaller methods


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/54d01618
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/54d01618
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/54d01618

Branch: refs/heads/master
Commit: 54d0161850bc4543839d4f36b46f3bd84f5cd8e1
Parents: 2db4eaf
Author: Francesco Nigro <ni...@gmail.com>
Authored: Tue Jan 16 17:01:54 2018 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Jan 22 18:02:03 2018 -0500

----------------------------------------------------------------------
 .../openwire/OpenWireMessageConverter.java      | 474 +++++++++++--------
 1 file changed, 280 insertions(+), 194 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/54d01618/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 83ff6d6..54f3c99 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -123,10 +123,10 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
 //   @Override
    public org.apache.activemq.artemis.api.core.Message inbound(Object message, CoreMessageObjectPools coreMessageObjectPools) throws Exception {
 
-      Message messageSend = (Message) message;
-      CoreMessage coreMessage = new CoreMessage(-1, messageSend.getSize(), coreMessageObjectPools);
+      final Message messageSend = (Message) message;
+      final CoreMessage coreMessage = new CoreMessage(-1, messageSend.getSize(), coreMessageObjectPools);
 
-      String type = messageSend.getType();
+      final String type = messageSend.getType();
       if (type != null) {
          coreMessage.putStringProperty(new SimpleString("JMSType"), new SimpleString(type));
       }
@@ -135,264 +135,350 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
       coreMessage.setPriority(messageSend.getPriority());
       coreMessage.setTimestamp(messageSend.getTimestamp());
 
-      byte coreType = toCoreType(messageSend.getDataStructureType());
+      final byte coreType = toCoreType(messageSend.getDataStructureType());
       coreMessage.setType(coreType);
 
-      ActiveMQBuffer body = coreMessage.getBodyBuffer();
+      final ActiveMQBuffer body = coreMessage.getBodyBuffer();
 
-      ByteSequence contents = messageSend.getContent();
+      final ByteSequence contents = messageSend.getContent();
       if (contents == null && coreType == org.apache.activemq.artemis.api.core.Message.TEXT_TYPE) {
          body.writeNullableString(null);
       } else if (contents != null) {
-         boolean messageCompressed = messageSend.isCompressed();
+         final boolean messageCompressed = messageSend.isCompressed();
          if (messageCompressed) {
             coreMessage.putBooleanProperty(AMQ_MSG_COMPRESSED, messageCompressed);
          }
 
          switch (coreType) {
             case org.apache.activemq.artemis.api.core.Message.TEXT_TYPE:
-               InputStream tis = new ByteArrayInputStream(contents);
-               if (messageCompressed) {
-                  tis = new InflaterInputStream(tis);
-               }
-               DataInputStream tdataIn = new DataInputStream(tis);
-               String text = MarshallingSupport.readUTF8(tdataIn);
-               tdataIn.close();
-               body.writeNullableSimpleString(new SimpleString(text));
+               writeTextType(contents, messageCompressed, body);
                break;
             case org.apache.activemq.artemis.api.core.Message.MAP_TYPE:
-               InputStream mis = new ByteArrayInputStream(contents);
-               if (messageCompressed) {
-                  mis = new InflaterInputStream(mis);
-               }
-               DataInputStream mdataIn = new DataInputStream(mis);
-               Map<String, Object> map = MarshallingSupport.unmarshalPrimitiveMap(mdataIn);
-               mdataIn.close();
-               TypedProperties props = new TypedProperties();
-               loadMapIntoProperties(props, map);
-               props.encode(body.byteBuf());
+               writeMapType(contents, messageCompressed, body);
                break;
             case org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE:
-               if (messageCompressed) {
-                  try (InputStream ois = new InflaterInputStream(new ByteArrayInputStream(contents));
-                       org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream()) {
-                     byte[] buf = new byte[1024];
-                     int n = ois.read(buf);
-                     while (n != -1) {
-                        decompressed.write(buf, 0, n);
-                        n = ois.read();
-                     }
-                     //read done
-                     contents = decompressed.toByteSequence();
-                  }
-               }
-               body.writeInt(contents.length);
-               body.writeBytes(contents.data, contents.offset, contents.length);
+               writeObjectType(contents, messageCompressed, body);
                break;
             case org.apache.activemq.artemis.api.core.Message.STREAM_TYPE:
-               InputStream sis = new ByteArrayInputStream(contents);
-               if (messageCompressed) {
-                  sis = new InflaterInputStream(sis);
-               }
-               DataInputStream sdis = new DataInputStream(sis);
-               int stype = sdis.read();
-               while (stype != -1) {
-                  switch (stype) {
-                     case MarshallingSupport.BOOLEAN_TYPE:
-                        body.writeByte(DataConstants.BOOLEAN);
-                        body.writeBoolean(sdis.readBoolean());
-                        break;
-                     case MarshallingSupport.BYTE_TYPE:
-                        body.writeByte(DataConstants.BYTE);
-                        body.writeByte(sdis.readByte());
-                        break;
-                     case MarshallingSupport.BYTE_ARRAY_TYPE:
-                        body.writeByte(DataConstants.BYTES);
-                        int slen = sdis.readInt();
-                        byte[] sbytes = new byte[slen];
-                        sdis.read(sbytes);
-                        body.writeInt(slen);
-                        body.writeBytes(sbytes);
-                        break;
-                     case MarshallingSupport.CHAR_TYPE:
-                        body.writeByte(DataConstants.CHAR);
-                        char schar = sdis.readChar();
-                        body.writeShort((short) schar);
-                        break;
-                     case MarshallingSupport.DOUBLE_TYPE:
-                        body.writeByte(DataConstants.DOUBLE);
-                        double sdouble = sdis.readDouble();
-                        body.writeLong(Double.doubleToLongBits(sdouble));
-                        break;
-                     case MarshallingSupport.FLOAT_TYPE:
-                        body.writeByte(DataConstants.FLOAT);
-                        float sfloat = sdis.readFloat();
-                        body.writeInt(Float.floatToIntBits(sfloat));
-                        break;
-                     case MarshallingSupport.INTEGER_TYPE:
-                        body.writeByte(DataConstants.INT);
-                        body.writeInt(sdis.readInt());
-                        break;
-                     case MarshallingSupport.LONG_TYPE:
-                        body.writeByte(DataConstants.LONG);
-                        body.writeLong(sdis.readLong());
-                        break;
-                     case MarshallingSupport.SHORT_TYPE:
-                        body.writeByte(DataConstants.SHORT);
-                        body.writeShort(sdis.readShort());
-                        break;
-                     case MarshallingSupport.STRING_TYPE:
-                        body.writeByte(DataConstants.STRING);
-                        String sstring = sdis.readUTF();
-                        body.writeNullableString(sstring);
-                        break;
-                     case MarshallingSupport.BIG_STRING_TYPE:
-                        body.writeByte(DataConstants.STRING);
-                        String sbigString = MarshallingSupport.readUTF8(sdis);
-                        body.writeNullableString(sbigString);
-                        break;
-                     case MarshallingSupport.NULL:
-                        body.writeByte(DataConstants.STRING);
-                        body.writeNullableString(null);
-                        break;
-                     default:
-                        //something we don't know, ignore
-                        break;
-                  }
-                  stype = sdis.read();
-               }
-               sdis.close();
+               writeStreamType(contents, messageCompressed, body);
                break;
             case org.apache.activemq.artemis.api.core.Message.BYTES_TYPE:
-               if (messageCompressed) {
-                  Inflater inflater = new Inflater();
-                  try (org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream()) {
-                     int length = ByteSequenceData.readIntBig(contents);
-                     contents.offset = 0;
-                     byte[] data = Arrays.copyOfRange(contents.getData(), 4, contents.getLength());
-
-                     inflater.setInput(data);
-                     byte[] buffer = new byte[length];
-                     int count = inflater.inflate(buffer);
-                     decompressed.write(buffer, 0, count);
-                     contents = decompressed.toByteSequence();
-                  } catch (Exception e) {
-                     throw new IOException(e);
-                  } finally {
-                     inflater.end();
-                  }
-               }
-               body.writeBytes(contents.data, contents.offset, contents.length);
+               writeBytesType(contents, messageCompressed, body);
                break;
             default:
-               if (messageCompressed) {
-                  try (org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream();
-                       OutputStream os = new InflaterOutputStream(decompressed)) {
-                     os.write(contents.data, contents.offset, contents.getLength());
-                     contents = decompressed.toByteSequence();
-                  } catch (Exception e) {
-                     throw new IOException(e);
-                  }
-               }
-               body.writeBytes(contents.data, contents.offset, contents.length);
+               writeDefaultType(contents, messageCompressed, body);
                break;
          }
       }
       //amq specific
       coreMessage.putLongProperty(AMQ_MSG_ARRIVAL, messageSend.getArrival());
       coreMessage.putLongProperty(AMQ_MSG_BROKER_IN_TIME, messageSend.getBrokerInTime());
-      BrokerId[] brokers = messageSend.getBrokerPath();
+      final BrokerId[] brokers = messageSend.getBrokerPath();
       if (brokers != null) {
-         StringBuilder builder = new StringBuilder();
-         for (int i = 0; i < brokers.length; i++) {
-            builder.append(brokers[i].getValue());
-            if (i != (brokers.length - 1)) {
-               builder.append(","); //is this separator safe?
-            }
-         }
-         coreMessage.putStringProperty(AMQ_MSG_BROKER_PATH, builder.toString());
+         putMsgBrokerPath(brokers, coreMessage);
       }
-      BrokerId[] cluster = messageSend.getCluster();
+      final BrokerId[] cluster = messageSend.getCluster();
       if (cluster != null) {
-         StringBuilder builder = new StringBuilder();
-         for (int i = 0; i < cluster.length; i++) {
-            builder.append(cluster[i].getValue());
-            if (i != (cluster.length - 1)) {
-               builder.append(","); //is this separator safe?
-            }
-         }
-         coreMessage.putStringProperty(AMQ_MSG_CLUSTER, builder.toString());
+         putMsgCluster(cluster, coreMessage);
       }
 
       coreMessage.putIntProperty(AMQ_MSG_COMMAND_ID, messageSend.getCommandId());
-      String corrId = messageSend.getCorrelationId();
+      final String corrId = messageSend.getCorrelationId();
       if (corrId != null) {
          coreMessage.putStringProperty("JMSCorrelationID", corrId);
       }
-      DataStructure ds = messageSend.getDataStructure();
+      final DataStructure ds = messageSend.getDataStructure();
       if (ds != null) {
-         ByteSequence dsBytes = marshaller.marshal(ds);
-         dsBytes.compact();
-         coreMessage.putBytesProperty(AMQ_MSG_DATASTRUCTURE, dsBytes.data);
+         putMsgDataStructure(ds, marshaller, coreMessage);
       }
-      String groupId = messageSend.getGroupID();
+      final String groupId = messageSend.getGroupID();
       if (groupId != null) {
          coreMessage.putStringProperty(AMQ_MSG_GROUP_ID, groupId);
       }
       coreMessage.putIntProperty(AMQ_MSG_GROUP_SEQUENCE, messageSend.getGroupSequence());
 
-      MessageId messageId = messageSend.getMessageId();
+      final MessageId messageId = messageSend.getMessageId();
 
-      ByteSequence midBytes = marshaller.marshal(messageId);
+      final ByteSequence midBytes = marshaller.marshal(messageId);
       midBytes.compact();
       coreMessage.putBytesProperty(AMQ_MSG_MESSAGE_ID, midBytes.data);
 
-      ProducerId producerId = messageSend.getProducerId();
+      final ProducerId producerId = messageSend.getProducerId();
       if (producerId != null) {
-         ByteSequence producerIdBytes = marshaller.marshal(producerId);
-         producerIdBytes.compact();
-         coreMessage.putBytesProperty(AMQ_MSG_PRODUCER_ID, producerIdBytes.data);
+         putMsgProducerId(producerId, marshaller, coreMessage);
       }
-      ByteSequence propBytes = messageSend.getMarshalledProperties();
+      final ByteSequence propBytes = messageSend.getMarshalledProperties();
       if (propBytes != null) {
-         propBytes.compact();
-         coreMessage.putBytesProperty(AMQ_MSG_MARSHALL_PROP, propBytes.data);
-         //unmarshall properties to core so selector will work
-         Map<String, Object> props = messageSend.getProperties();
-         //Map<String, Object> props = MarshallingSupport.unmarshalPrimitiveMap(new DataInputStream(new ByteArrayInputStream(propBytes)));
-         for (Entry<String, Object> ent : props.entrySet()) {
-            Object value = ent.getValue();
-            try {
-               coreMessage.putObjectProperty(ent.getKey(), value);
-            } catch (ActiveMQPropertyConversionException e) {
-               coreMessage.putStringProperty(ent.getKey(), value.toString());
-            }
-         }
+         putMsgMarshalledProperties(propBytes, messageSend, coreMessage);
       }
 
-      ActiveMQDestination replyTo = messageSend.getReplyTo();
+      final ActiveMQDestination replyTo = messageSend.getReplyTo();
       if (replyTo != null) {
-         ByteSequence replyToBytes = marshaller.marshal(replyTo);
-         replyToBytes.compact();
-         coreMessage.putBytesProperty(AMQ_MSG_REPLY_TO, replyToBytes.data);
+         putMsgReplyTo(replyTo, marshaller, coreMessage);
       }
 
-      String userId = messageSend.getUserID();
+      final String userId = messageSend.getUserID();
       if (userId != null) {
          coreMessage.putStringProperty(AMQ_MSG_USER_ID, userId);
       }
       coreMessage.putBooleanProperty(AMQ_MSG_DROPPABLE, messageSend.isDroppable());
 
-      ActiveMQDestination origDest = messageSend.getOriginalDestination();
+      final ActiveMQDestination origDest = messageSend.getOriginalDestination();
       if (origDest != null) {
-         ByteSequence origDestBytes = marshaller.marshal(origDest);
-         origDestBytes.compact();
-         coreMessage.putBytesProperty(AMQ_MSG_ORIG_DESTINATION, origDestBytes.data);
+         putMsgOriginalDestination(origDest, marshaller, coreMessage);
       }
 
       return coreMessage;
    }
 
+   private static void writeTextType(final ByteSequence contents,
+                                     final boolean messageCompressed,
+                                     final ActiveMQBuffer body) throws IOException {
+      InputStream tis = new ByteArrayInputStream(contents);
+      if (messageCompressed) {
+         tis = new InflaterInputStream(tis);
+      }
+      DataInputStream tdataIn = new DataInputStream(tis);
+      String text = MarshallingSupport.readUTF8(tdataIn);
+      tdataIn.close();
+      body.writeNullableSimpleString(new SimpleString(text));
+   }
+
+   private static void writeMapType(final ByteSequence contents,
+                                    final boolean messageCompressed,
+                                    final ActiveMQBuffer body) throws IOException {
+      InputStream mis = new ByteArrayInputStream(contents);
+      if (messageCompressed) {
+         mis = new InflaterInputStream(mis);
+      }
+      DataInputStream mdataIn = new DataInputStream(mis);
+      Map<String, Object> map = MarshallingSupport.unmarshalPrimitiveMap(mdataIn);
+      mdataIn.close();
+      TypedProperties props = new TypedProperties();
+      loadMapIntoProperties(props, map);
+      props.encode(body.byteBuf());
+   }
+
+   private static void writeObjectType(ByteSequence contents,
+                                       final boolean messageCompressed,
+                                       final ActiveMQBuffer body) throws IOException {
+      if (messageCompressed) {
+         contents = writeCompressedObjectType(contents);
+      }
+      body.writeInt(contents.length);
+      body.writeBytes(contents.data, contents.offset, contents.length);
+   }
+
+   private static ByteSequence writeCompressedObjectType(final ByteSequence contents) throws IOException {
+      try (InputStream ois = new InflaterInputStream(new ByteArrayInputStream(contents));
+           org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream()) {
+         byte[] buf = new byte[1024];
+         int n = ois.read(buf);
+         while (n != -1) {
+            decompressed.write(buf, 0, n);
+            n = ois.read();
+         }
+         //read done
+         return decompressed.toByteSequence();
+      }
+   }
+
+   private static void writeStreamType(final ByteSequence contents,
+                                       final boolean messageCompressed,
+                                       final ActiveMQBuffer body) throws IOException {
+      InputStream sis = new ByteArrayInputStream(contents);
+      if (messageCompressed) {
+         sis = new InflaterInputStream(sis);
+      }
+      DataInputStream sdis = new DataInputStream(sis);
+      int stype = sdis.read();
+      while (stype != -1) {
+         switch (stype) {
+            case MarshallingSupport.BOOLEAN_TYPE:
+               body.writeByte(DataConstants.BOOLEAN);
+               body.writeBoolean(sdis.readBoolean());
+               break;
+            case MarshallingSupport.BYTE_TYPE:
+               body.writeByte(DataConstants.BYTE);
+               body.writeByte(sdis.readByte());
+               break;
+            case MarshallingSupport.BYTE_ARRAY_TYPE:
+               body.writeByte(DataConstants.BYTES);
+               int slen = sdis.readInt();
+               byte[] sbytes = new byte[slen];
+               sdis.read(sbytes);
+               body.writeInt(slen);
+               body.writeBytes(sbytes);
+               break;
+            case MarshallingSupport.CHAR_TYPE:
+               body.writeByte(DataConstants.CHAR);
+               char schar = sdis.readChar();
+               body.writeShort((short) schar);
+               break;
+            case MarshallingSupport.DOUBLE_TYPE:
+               body.writeByte(DataConstants.DOUBLE);
+               double sdouble = sdis.readDouble();
+               body.writeLong(Double.doubleToLongBits(sdouble));
+               break;
+            case MarshallingSupport.FLOAT_TYPE:
+               body.writeByte(DataConstants.FLOAT);
+               float sfloat = sdis.readFloat();
+               body.writeInt(Float.floatToIntBits(sfloat));
+               break;
+            case MarshallingSupport.INTEGER_TYPE:
+               body.writeByte(DataConstants.INT);
+               body.writeInt(sdis.readInt());
+               break;
+            case MarshallingSupport.LONG_TYPE:
+               body.writeByte(DataConstants.LONG);
+               body.writeLong(sdis.readLong());
+               break;
+            case MarshallingSupport.SHORT_TYPE:
+               body.writeByte(DataConstants.SHORT);
+               body.writeShort(sdis.readShort());
+               break;
+            case MarshallingSupport.STRING_TYPE:
+               body.writeByte(DataConstants.STRING);
+               String sstring = sdis.readUTF();
+               body.writeNullableString(sstring);
+               break;
+            case MarshallingSupport.BIG_STRING_TYPE:
+               body.writeByte(DataConstants.STRING);
+               String sbigString = MarshallingSupport.readUTF8(sdis);
+               body.writeNullableString(sbigString);
+               break;
+            case MarshallingSupport.NULL:
+               body.writeByte(DataConstants.STRING);
+               body.writeNullableString(null);
+               break;
+            default:
+               //something we don't know, ignore
+               break;
+         }
+         stype = sdis.read();
+      }
+      sdis.close();
+   }
+
+   private static void writeBytesType(ByteSequence contents,
+                                      final boolean messageCompressed,
+                                      final ActiveMQBuffer body) throws IOException {
+      if (messageCompressed) {
+         contents = writeCompressedBytesType(contents);
+      }
+      body.writeBytes(contents.data, contents.offset, contents.length);
+   }
+
+   private static ByteSequence writeCompressedBytesType(final ByteSequence contents) throws IOException {
+      Inflater inflater = new Inflater();
+      try (org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream()) {
+         int length = ByteSequenceData.readIntBig(contents);
+         contents.offset = 0;
+         byte[] data = Arrays.copyOfRange(contents.getData(), 4, contents.getLength());
+
+         inflater.setInput(data);
+         byte[] buffer = new byte[length];
+         int count = inflater.inflate(buffer);
+         decompressed.write(buffer, 0, count);
+         return decompressed.toByteSequence();
+      } catch (Exception e) {
+         throw new IOException(e);
+      } finally {
+         inflater.end();
+      }
+   }
+
+   private static void writeDefaultType(ByteSequence contents,
+                                        final boolean messageCompressed,
+                                        final ActiveMQBuffer body) throws IOException {
+      if (messageCompressed) {
+         contents = writeCompressedDefaultType(contents);
+      }
+      body.writeBytes(contents.data, contents.offset, contents.length);
+   }
+
+   private static ByteSequence writeCompressedDefaultType(final ByteSequence contents) throws IOException {
+      try (org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream();
+           OutputStream os = new InflaterOutputStream(decompressed)) {
+         os.write(contents.data, contents.offset, contents.getLength());
+         return decompressed.toByteSequence();
+      } catch (Exception e) {
+         throw new IOException(e);
+      }
+   }
+
+   private static void putMsgBrokerPath(final BrokerId[] brokers, final CoreMessage coreMessage) {
+      final StringBuilder builder = new StringBuilder();
+      for (int i = 0, size = brokers.length; i < size; i++) {
+         builder.append(brokers[i].getValue());
+         if (i != (size - 1)) {
+            builder.append(','); //is this separator safe?
+         }
+      }
+      coreMessage.putStringProperty(AMQ_MSG_BROKER_PATH, builder.toString());
+   }
+
+   private static void putMsgCluster(final BrokerId[] cluster, final CoreMessage coreMessage) {
+      final StringBuilder builder = new StringBuilder();
+      for (int i = 0, size = cluster.length; i < size; i++) {
+         builder.append(cluster[i].getValue());
+         if (i != (size - 1)) {
+            builder.append(','); //is this separator safe?
+         }
+      }
+      coreMessage.putStringProperty(AMQ_MSG_CLUSTER, builder.toString());
+   }
+
+   private static void putMsgDataStructure(final DataStructure ds,
+                                           final WireFormat marshaller,
+                                           final CoreMessage coreMessage) throws IOException {
+      final ByteSequence dsBytes = marshaller.marshal(ds);
+      dsBytes.compact();
+      coreMessage.putBytesProperty(AMQ_MSG_DATASTRUCTURE, dsBytes.data);
+   }
+
+   private static void putMsgProducerId(final ProducerId producerId,
+                                        final WireFormat marshaller,
+                                        final CoreMessage coreMessage) throws IOException {
+      final ByteSequence producerIdBytes = marshaller.marshal(producerId);
+      producerIdBytes.compact();
+      coreMessage.putBytesProperty(AMQ_MSG_PRODUCER_ID, producerIdBytes.data);
+   }
+
+   private static void putMsgMarshalledProperties(final ByteSequence propBytes,
+                                                  final Message messageSend,
+                                                  final CoreMessage coreMessage) throws IOException {
+      propBytes.compact();
+      coreMessage.putBytesProperty(AMQ_MSG_MARSHALL_PROP, propBytes.data);
+      //unmarshall properties to core so selector will work
+      final Map<String, Object> props = messageSend.getProperties();
+      if (!props.isEmpty()) {
+         props.forEach((key, value) -> {
+            try {
+               coreMessage.putObjectProperty(key, value);
+            } catch (ActiveMQPropertyConversionException e) {
+               coreMessage.putStringProperty(key, value.toString());
+            }
+         });
+      }
+   }
+
+   private static void putMsgReplyTo(final ActiveMQDestination replyTo,
+                                     final WireFormat marshaller,
+                                     final CoreMessage coreMessage) throws IOException {
+      final ByteSequence replyToBytes = marshaller.marshal(replyTo);
+      replyToBytes.compact();
+      coreMessage.putBytesProperty(AMQ_MSG_REPLY_TO, replyToBytes.data);
+   }
+
+   private static void putMsgOriginalDestination(final ActiveMQDestination origDest,
+                                                 final WireFormat marshaller,
+                                                 final CoreMessage coreMessage) throws IOException {
+      final ByteSequence origDestBytes = marshaller.marshal(origDest);
+      origDestBytes.compact();
+      coreMessage.putBytesProperty(AMQ_MSG_ORIG_DESTINATION, origDestBytes.data);
+   }
+
    private static void loadMapIntoProperties(TypedProperties props, Map<String, Object> map) {
       for (Entry<String, Object> entry : map.entrySet()) {
          SimpleString key = new SimpleString(entry.getKey());