You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/01/22 23:06:09 UTC
[05/10] activemq-artemis git commit: ARTEMIS-1616 OpenWire
improvements
ARTEMIS-1616 OpenWire improvements
Refactored OpenWireMessageConverter::inbound into smaller methods
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/54d01618
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/54d01618
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/54d01618
Branch: refs/heads/master
Commit: 54d0161850bc4543839d4f36b46f3bd84f5cd8e1
Parents: 2db4eaf
Author: Francesco Nigro <ni...@gmail.com>
Authored: Tue Jan 16 17:01:54 2018 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Jan 22 18:02:03 2018 -0500
----------------------------------------------------------------------
.../openwire/OpenWireMessageConverter.java | 474 +++++++++++--------
1 file changed, 280 insertions(+), 194 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/54d01618/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 83ff6d6..54f3c99 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -123,10 +123,10 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
// @Override
public org.apache.activemq.artemis.api.core.Message inbound(Object message, CoreMessageObjectPools coreMessageObjectPools) throws Exception {
- Message messageSend = (Message) message;
- CoreMessage coreMessage = new CoreMessage(-1, messageSend.getSize(), coreMessageObjectPools);
+ final Message messageSend = (Message) message;
+ final CoreMessage coreMessage = new CoreMessage(-1, messageSend.getSize(), coreMessageObjectPools);
- String type = messageSend.getType();
+ final String type = messageSend.getType();
if (type != null) {
coreMessage.putStringProperty(new SimpleString("JMSType"), new SimpleString(type));
}
@@ -135,264 +135,350 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
coreMessage.setPriority(messageSend.getPriority());
coreMessage.setTimestamp(messageSend.getTimestamp());
- byte coreType = toCoreType(messageSend.getDataStructureType());
+ final byte coreType = toCoreType(messageSend.getDataStructureType());
coreMessage.setType(coreType);
- ActiveMQBuffer body = coreMessage.getBodyBuffer();
+ final ActiveMQBuffer body = coreMessage.getBodyBuffer();
- ByteSequence contents = messageSend.getContent();
+ final ByteSequence contents = messageSend.getContent();
if (contents == null && coreType == org.apache.activemq.artemis.api.core.Message.TEXT_TYPE) {
body.writeNullableString(null);
} else if (contents != null) {
- boolean messageCompressed = messageSend.isCompressed();
+ final boolean messageCompressed = messageSend.isCompressed();
if (messageCompressed) {
coreMessage.putBooleanProperty(AMQ_MSG_COMPRESSED, messageCompressed);
}
switch (coreType) {
case org.apache.activemq.artemis.api.core.Message.TEXT_TYPE:
- InputStream tis = new ByteArrayInputStream(contents);
- if (messageCompressed) {
- tis = new InflaterInputStream(tis);
- }
- DataInputStream tdataIn = new DataInputStream(tis);
- String text = MarshallingSupport.readUTF8(tdataIn);
- tdataIn.close();
- body.writeNullableSimpleString(new SimpleString(text));
+ writeTextType(contents, messageCompressed, body);
break;
case org.apache.activemq.artemis.api.core.Message.MAP_TYPE:
- InputStream mis = new ByteArrayInputStream(contents);
- if (messageCompressed) {
- mis = new InflaterInputStream(mis);
- }
- DataInputStream mdataIn = new DataInputStream(mis);
- Map<String, Object> map = MarshallingSupport.unmarshalPrimitiveMap(mdataIn);
- mdataIn.close();
- TypedProperties props = new TypedProperties();
- loadMapIntoProperties(props, map);
- props.encode(body.byteBuf());
+ writeMapType(contents, messageCompressed, body);
break;
case org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE:
- if (messageCompressed) {
- try (InputStream ois = new InflaterInputStream(new ByteArrayInputStream(contents));
- org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream()) {
- byte[] buf = new byte[1024];
- int n = ois.read(buf);
- while (n != -1) {
- decompressed.write(buf, 0, n);
- n = ois.read();
- }
- //read done
- contents = decompressed.toByteSequence();
- }
- }
- body.writeInt(contents.length);
- body.writeBytes(contents.data, contents.offset, contents.length);
+ writeObjectType(contents, messageCompressed, body);
break;
case org.apache.activemq.artemis.api.core.Message.STREAM_TYPE:
- InputStream sis = new ByteArrayInputStream(contents);
- if (messageCompressed) {
- sis = new InflaterInputStream(sis);
- }
- DataInputStream sdis = new DataInputStream(sis);
- int stype = sdis.read();
- while (stype != -1) {
- switch (stype) {
- case MarshallingSupport.BOOLEAN_TYPE:
- body.writeByte(DataConstants.BOOLEAN);
- body.writeBoolean(sdis.readBoolean());
- break;
- case MarshallingSupport.BYTE_TYPE:
- body.writeByte(DataConstants.BYTE);
- body.writeByte(sdis.readByte());
- break;
- case MarshallingSupport.BYTE_ARRAY_TYPE:
- body.writeByte(DataConstants.BYTES);
- int slen = sdis.readInt();
- byte[] sbytes = new byte[slen];
- sdis.read(sbytes);
- body.writeInt(slen);
- body.writeBytes(sbytes);
- break;
- case MarshallingSupport.CHAR_TYPE:
- body.writeByte(DataConstants.CHAR);
- char schar = sdis.readChar();
- body.writeShort((short) schar);
- break;
- case MarshallingSupport.DOUBLE_TYPE:
- body.writeByte(DataConstants.DOUBLE);
- double sdouble = sdis.readDouble();
- body.writeLong(Double.doubleToLongBits(sdouble));
- break;
- case MarshallingSupport.FLOAT_TYPE:
- body.writeByte(DataConstants.FLOAT);
- float sfloat = sdis.readFloat();
- body.writeInt(Float.floatToIntBits(sfloat));
- break;
- case MarshallingSupport.INTEGER_TYPE:
- body.writeByte(DataConstants.INT);
- body.writeInt(sdis.readInt());
- break;
- case MarshallingSupport.LONG_TYPE:
- body.writeByte(DataConstants.LONG);
- body.writeLong(sdis.readLong());
- break;
- case MarshallingSupport.SHORT_TYPE:
- body.writeByte(DataConstants.SHORT);
- body.writeShort(sdis.readShort());
- break;
- case MarshallingSupport.STRING_TYPE:
- body.writeByte(DataConstants.STRING);
- String sstring = sdis.readUTF();
- body.writeNullableString(sstring);
- break;
- case MarshallingSupport.BIG_STRING_TYPE:
- body.writeByte(DataConstants.STRING);
- String sbigString = MarshallingSupport.readUTF8(sdis);
- body.writeNullableString(sbigString);
- break;
- case MarshallingSupport.NULL:
- body.writeByte(DataConstants.STRING);
- body.writeNullableString(null);
- break;
- default:
- //something we don't know, ignore
- break;
- }
- stype = sdis.read();
- }
- sdis.close();
+ writeStreamType(contents, messageCompressed, body);
break;
case org.apache.activemq.artemis.api.core.Message.BYTES_TYPE:
- if (messageCompressed) {
- Inflater inflater = new Inflater();
- try (org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream()) {
- int length = ByteSequenceData.readIntBig(contents);
- contents.offset = 0;
- byte[] data = Arrays.copyOfRange(contents.getData(), 4, contents.getLength());
-
- inflater.setInput(data);
- byte[] buffer = new byte[length];
- int count = inflater.inflate(buffer);
- decompressed.write(buffer, 0, count);
- contents = decompressed.toByteSequence();
- } catch (Exception e) {
- throw new IOException(e);
- } finally {
- inflater.end();
- }
- }
- body.writeBytes(contents.data, contents.offset, contents.length);
+ writeBytesType(contents, messageCompressed, body);
break;
default:
- if (messageCompressed) {
- try (org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream();
- OutputStream os = new InflaterOutputStream(decompressed)) {
- os.write(contents.data, contents.offset, contents.getLength());
- contents = decompressed.toByteSequence();
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
- body.writeBytes(contents.data, contents.offset, contents.length);
+ writeDefaultType(contents, messageCompressed, body);
break;
}
}
//amq specific
coreMessage.putLongProperty(AMQ_MSG_ARRIVAL, messageSend.getArrival());
coreMessage.putLongProperty(AMQ_MSG_BROKER_IN_TIME, messageSend.getBrokerInTime());
- BrokerId[] brokers = messageSend.getBrokerPath();
+ final BrokerId[] brokers = messageSend.getBrokerPath();
if (brokers != null) {
- StringBuilder builder = new StringBuilder();
- for (int i = 0; i < brokers.length; i++) {
- builder.append(brokers[i].getValue());
- if (i != (brokers.length - 1)) {
- builder.append(","); //is this separator safe?
- }
- }
- coreMessage.putStringProperty(AMQ_MSG_BROKER_PATH, builder.toString());
+ putMsgBrokerPath(brokers, coreMessage);
}
- BrokerId[] cluster = messageSend.getCluster();
+ final BrokerId[] cluster = messageSend.getCluster();
if (cluster != null) {
- StringBuilder builder = new StringBuilder();
- for (int i = 0; i < cluster.length; i++) {
- builder.append(cluster[i].getValue());
- if (i != (cluster.length - 1)) {
- builder.append(","); //is this separator safe?
- }
- }
- coreMessage.putStringProperty(AMQ_MSG_CLUSTER, builder.toString());
+ putMsgCluster(cluster, coreMessage);
}
coreMessage.putIntProperty(AMQ_MSG_COMMAND_ID, messageSend.getCommandId());
- String corrId = messageSend.getCorrelationId();
+ final String corrId = messageSend.getCorrelationId();
if (corrId != null) {
coreMessage.putStringProperty("JMSCorrelationID", corrId);
}
- DataStructure ds = messageSend.getDataStructure();
+ final DataStructure ds = messageSend.getDataStructure();
if (ds != null) {
- ByteSequence dsBytes = marshaller.marshal(ds);
- dsBytes.compact();
- coreMessage.putBytesProperty(AMQ_MSG_DATASTRUCTURE, dsBytes.data);
+ putMsgDataStructure(ds, marshaller, coreMessage);
}
- String groupId = messageSend.getGroupID();
+ final String groupId = messageSend.getGroupID();
if (groupId != null) {
coreMessage.putStringProperty(AMQ_MSG_GROUP_ID, groupId);
}
coreMessage.putIntProperty(AMQ_MSG_GROUP_SEQUENCE, messageSend.getGroupSequence());
- MessageId messageId = messageSend.getMessageId();
+ final MessageId messageId = messageSend.getMessageId();
- ByteSequence midBytes = marshaller.marshal(messageId);
+ final ByteSequence midBytes = marshaller.marshal(messageId);
midBytes.compact();
coreMessage.putBytesProperty(AMQ_MSG_MESSAGE_ID, midBytes.data);
- ProducerId producerId = messageSend.getProducerId();
+ final ProducerId producerId = messageSend.getProducerId();
if (producerId != null) {
- ByteSequence producerIdBytes = marshaller.marshal(producerId);
- producerIdBytes.compact();
- coreMessage.putBytesProperty(AMQ_MSG_PRODUCER_ID, producerIdBytes.data);
+ putMsgProducerId(producerId, marshaller, coreMessage);
}
- ByteSequence propBytes = messageSend.getMarshalledProperties();
+ final ByteSequence propBytes = messageSend.getMarshalledProperties();
if (propBytes != null) {
- propBytes.compact();
- coreMessage.putBytesProperty(AMQ_MSG_MARSHALL_PROP, propBytes.data);
- //unmarshall properties to core so selector will work
- Map<String, Object> props = messageSend.getProperties();
- //Map<String, Object> props = MarshallingSupport.unmarshalPrimitiveMap(new DataInputStream(new ByteArrayInputStream(propBytes)));
- for (Entry<String, Object> ent : props.entrySet()) {
- Object value = ent.getValue();
- try {
- coreMessage.putObjectProperty(ent.getKey(), value);
- } catch (ActiveMQPropertyConversionException e) {
- coreMessage.putStringProperty(ent.getKey(), value.toString());
- }
- }
+ putMsgMarshalledProperties(propBytes, messageSend, coreMessage);
}
- ActiveMQDestination replyTo = messageSend.getReplyTo();
+ final ActiveMQDestination replyTo = messageSend.getReplyTo();
if (replyTo != null) {
- ByteSequence replyToBytes = marshaller.marshal(replyTo);
- replyToBytes.compact();
- coreMessage.putBytesProperty(AMQ_MSG_REPLY_TO, replyToBytes.data);
+ putMsgReplyTo(replyTo, marshaller, coreMessage);
}
- String userId = messageSend.getUserID();
+ final String userId = messageSend.getUserID();
if (userId != null) {
coreMessage.putStringProperty(AMQ_MSG_USER_ID, userId);
}
coreMessage.putBooleanProperty(AMQ_MSG_DROPPABLE, messageSend.isDroppable());
- ActiveMQDestination origDest = messageSend.getOriginalDestination();
+ final ActiveMQDestination origDest = messageSend.getOriginalDestination();
if (origDest != null) {
- ByteSequence origDestBytes = marshaller.marshal(origDest);
- origDestBytes.compact();
- coreMessage.putBytesProperty(AMQ_MSG_ORIG_DESTINATION, origDestBytes.data);
+ putMsgOriginalDestination(origDest, marshaller, coreMessage);
}
return coreMessage;
}
+ private static void writeTextType(final ByteSequence contents,
+ final boolean messageCompressed,
+ final ActiveMQBuffer body) throws IOException {
+ InputStream tis = new ByteArrayInputStream(contents);
+ if (messageCompressed) {
+ tis = new InflaterInputStream(tis);
+ }
+ DataInputStream tdataIn = new DataInputStream(tis);
+ String text = MarshallingSupport.readUTF8(tdataIn);
+ tdataIn.close();
+ body.writeNullableSimpleString(new SimpleString(text));
+ }
+
+ private static void writeMapType(final ByteSequence contents,
+ final boolean messageCompressed,
+ final ActiveMQBuffer body) throws IOException {
+ InputStream mis = new ByteArrayInputStream(contents);
+ if (messageCompressed) {
+ mis = new InflaterInputStream(mis);
+ }
+ DataInputStream mdataIn = new DataInputStream(mis);
+ Map<String, Object> map = MarshallingSupport.unmarshalPrimitiveMap(mdataIn);
+ mdataIn.close();
+ TypedProperties props = new TypedProperties();
+ loadMapIntoProperties(props, map);
+ props.encode(body.byteBuf());
+ }
+
+ private static void writeObjectType(ByteSequence contents,
+ final boolean messageCompressed,
+ final ActiveMQBuffer body) throws IOException {
+ if (messageCompressed) {
+ contents = writeCompressedObjectType(contents);
+ }
+ body.writeInt(contents.length);
+ body.writeBytes(contents.data, contents.offset, contents.length);
+ }
+
+ private static ByteSequence writeCompressedObjectType(final ByteSequence contents) throws IOException {
+ try (InputStream ois = new InflaterInputStream(new ByteArrayInputStream(contents));
+ org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream()) {
+ byte[] buf = new byte[1024];
+ int n = ois.read(buf);
+ while (n != -1) {
+ decompressed.write(buf, 0, n);
+ n = ois.read();
+ }
+ //read done
+ return decompressed.toByteSequence();
+ }
+ }
+
+ private static void writeStreamType(final ByteSequence contents,
+ final boolean messageCompressed,
+ final ActiveMQBuffer body) throws IOException {
+ InputStream sis = new ByteArrayInputStream(contents);
+ if (messageCompressed) {
+ sis = new InflaterInputStream(sis);
+ }
+ DataInputStream sdis = new DataInputStream(sis);
+ int stype = sdis.read();
+ while (stype != -1) {
+ switch (stype) {
+ case MarshallingSupport.BOOLEAN_TYPE:
+ body.writeByte(DataConstants.BOOLEAN);
+ body.writeBoolean(sdis.readBoolean());
+ break;
+ case MarshallingSupport.BYTE_TYPE:
+ body.writeByte(DataConstants.BYTE);
+ body.writeByte(sdis.readByte());
+ break;
+ case MarshallingSupport.BYTE_ARRAY_TYPE:
+ body.writeByte(DataConstants.BYTES);
+ int slen = sdis.readInt();
+ byte[] sbytes = new byte[slen];
+ sdis.read(sbytes);
+ body.writeInt(slen);
+ body.writeBytes(sbytes);
+ break;
+ case MarshallingSupport.CHAR_TYPE:
+ body.writeByte(DataConstants.CHAR);
+ char schar = sdis.readChar();
+ body.writeShort((short) schar);
+ break;
+ case MarshallingSupport.DOUBLE_TYPE:
+ body.writeByte(DataConstants.DOUBLE);
+ double sdouble = sdis.readDouble();
+ body.writeLong(Double.doubleToLongBits(sdouble));
+ break;
+ case MarshallingSupport.FLOAT_TYPE:
+ body.writeByte(DataConstants.FLOAT);
+ float sfloat = sdis.readFloat();
+ body.writeInt(Float.floatToIntBits(sfloat));
+ break;
+ case MarshallingSupport.INTEGER_TYPE:
+ body.writeByte(DataConstants.INT);
+ body.writeInt(sdis.readInt());
+ break;
+ case MarshallingSupport.LONG_TYPE:
+ body.writeByte(DataConstants.LONG);
+ body.writeLong(sdis.readLong());
+ break;
+ case MarshallingSupport.SHORT_TYPE:
+ body.writeByte(DataConstants.SHORT);
+ body.writeShort(sdis.readShort());
+ break;
+ case MarshallingSupport.STRING_TYPE:
+ body.writeByte(DataConstants.STRING);
+ String sstring = sdis.readUTF();
+ body.writeNullableString(sstring);
+ break;
+ case MarshallingSupport.BIG_STRING_TYPE:
+ body.writeByte(DataConstants.STRING);
+ String sbigString = MarshallingSupport.readUTF8(sdis);
+ body.writeNullableString(sbigString);
+ break;
+ case MarshallingSupport.NULL:
+ body.writeByte(DataConstants.STRING);
+ body.writeNullableString(null);
+ break;
+ default:
+ //something we don't know, ignore
+ break;
+ }
+ stype = sdis.read();
+ }
+ sdis.close();
+ }
+
+ private static void writeBytesType(ByteSequence contents,
+ final boolean messageCompressed,
+ final ActiveMQBuffer body) throws IOException {
+ if (messageCompressed) {
+ contents = writeCompressedBytesType(contents);
+ }
+ body.writeBytes(contents.data, contents.offset, contents.length);
+ }
+
+ private static ByteSequence writeCompressedBytesType(final ByteSequence contents) throws IOException {
+ Inflater inflater = new Inflater();
+ try (org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream()) {
+ int length = ByteSequenceData.readIntBig(contents);
+ contents.offset = 0;
+ byte[] data = Arrays.copyOfRange(contents.getData(), 4, contents.getLength());
+
+ inflater.setInput(data);
+ byte[] buffer = new byte[length];
+ int count = inflater.inflate(buffer);
+ decompressed.write(buffer, 0, count);
+ return decompressed.toByteSequence();
+ } catch (Exception e) {
+ throw new IOException(e);
+ } finally {
+ inflater.end();
+ }
+ }
+
+ private static void writeDefaultType(ByteSequence contents,
+ final boolean messageCompressed,
+ final ActiveMQBuffer body) throws IOException {
+ if (messageCompressed) {
+ contents = writeCompressedDefaultType(contents);
+ }
+ body.writeBytes(contents.data, contents.offset, contents.length);
+ }
+
+ private static ByteSequence writeCompressedDefaultType(final ByteSequence contents) throws IOException {
+ try (org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream();
+ OutputStream os = new InflaterOutputStream(decompressed)) {
+ os.write(contents.data, contents.offset, contents.getLength());
+ return decompressed.toByteSequence();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ private static void putMsgBrokerPath(final BrokerId[] brokers, final CoreMessage coreMessage) {
+ final StringBuilder builder = new StringBuilder();
+ for (int i = 0, size = brokers.length; i < size; i++) {
+ builder.append(brokers[i].getValue());
+ if (i != (size - 1)) {
+ builder.append(','); //is this separator safe?
+ }
+ }
+ coreMessage.putStringProperty(AMQ_MSG_BROKER_PATH, builder.toString());
+ }
+
+ private static void putMsgCluster(final BrokerId[] cluster, final CoreMessage coreMessage) {
+ final StringBuilder builder = new StringBuilder();
+ for (int i = 0, size = cluster.length; i < size; i++) {
+ builder.append(cluster[i].getValue());
+ if (i != (size - 1)) {
+ builder.append(','); //is this separator safe?
+ }
+ }
+ coreMessage.putStringProperty(AMQ_MSG_CLUSTER, builder.toString());
+ }
+
+ private static void putMsgDataStructure(final DataStructure ds,
+ final WireFormat marshaller,
+ final CoreMessage coreMessage) throws IOException {
+ final ByteSequence dsBytes = marshaller.marshal(ds);
+ dsBytes.compact();
+ coreMessage.putBytesProperty(AMQ_MSG_DATASTRUCTURE, dsBytes.data);
+ }
+
+ private static void putMsgProducerId(final ProducerId producerId,
+ final WireFormat marshaller,
+ final CoreMessage coreMessage) throws IOException {
+ final ByteSequence producerIdBytes = marshaller.marshal(producerId);
+ producerIdBytes.compact();
+ coreMessage.putBytesProperty(AMQ_MSG_PRODUCER_ID, producerIdBytes.data);
+ }
+
+ private static void putMsgMarshalledProperties(final ByteSequence propBytes,
+ final Message messageSend,
+ final CoreMessage coreMessage) throws IOException {
+ propBytes.compact();
+ coreMessage.putBytesProperty(AMQ_MSG_MARSHALL_PROP, propBytes.data);
+ //unmarshall properties to core so selector will work
+ final Map<String, Object> props = messageSend.getProperties();
+ if (!props.isEmpty()) {
+ props.forEach((key, value) -> {
+ try {
+ coreMessage.putObjectProperty(key, value);
+ } catch (ActiveMQPropertyConversionException e) {
+ coreMessage.putStringProperty(key, value.toString());
+ }
+ });
+ }
+ }
+
+ private static void putMsgReplyTo(final ActiveMQDestination replyTo,
+ final WireFormat marshaller,
+ final CoreMessage coreMessage) throws IOException {
+ final ByteSequence replyToBytes = marshaller.marshal(replyTo);
+ replyToBytes.compact();
+ coreMessage.putBytesProperty(AMQ_MSG_REPLY_TO, replyToBytes.data);
+ }
+
+ private static void putMsgOriginalDestination(final ActiveMQDestination origDest,
+ final WireFormat marshaller,
+ final CoreMessage coreMessage) throws IOException {
+ final ByteSequence origDestBytes = marshaller.marshal(origDest);
+ origDestBytes.compact();
+ coreMessage.putBytesProperty(AMQ_MSG_ORIG_DESTINATION, origDestBytes.data);
+ }
+
private static void loadMapIntoProperties(TypedProperties props, Map<String, Object> map) {
for (Entry<String, Object> entry : map.entrySet()) {
SimpleString key = new SimpleString(entry.getKey());