You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/01/22 23:06:10 UTC

[06/10] activemq-artemis git commit: ARTEMIS-1616 OpenWire improvements

ARTEMIS-1616 OpenWire improvements

Used SimpleString on OpenWireMessageConverter to avoid translations on CoreMessage


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/64724c35
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/64724c35
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/64724c35

Branch: refs/heads/master
Commit: 64724c3520586c6cd1bc0aec9942ae5bb5562459
Parents: 17c0a33
Author: Francesco Nigro <ni...@gmail.com>
Authored: Tue Jan 16 22:24:08 2018 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Jan 22 18:02:03 2018 -0500

----------------------------------------------------------------------
 .../openwire/OpenWireMessageConverter.java      | 331 ++++++++++---------
 .../core/protocol/openwire/amq/AMQConsumer.java |   2 +-
 2 files changed, 167 insertions(+), 166 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64724c35/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 54f3c99..457593d 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -71,29 +71,31 @@ import org.fusesource.hawtbuf.UTF8Buffer;
 
 public class OpenWireMessageConverter implements MessageConverter<OpenwireMessage> {
 
-   public static final String AMQ_PREFIX = "__HDR_";
-   public static final String AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY = AMQ_PREFIX + "dlqDeliveryFailureCause";
-
-   private static final String AMQ_MSG_ARRIVAL = AMQ_PREFIX + "ARRIVAL";
-   private static final String AMQ_MSG_BROKER_IN_TIME = AMQ_PREFIX + "BROKER_IN_TIME";
-
-   private static final String AMQ_MSG_BROKER_PATH = AMQ_PREFIX + "BROKER_PATH";
-   private static final String AMQ_MSG_CLUSTER = AMQ_PREFIX + "CLUSTER";
-   private static final String AMQ_MSG_COMMAND_ID = AMQ_PREFIX + "COMMAND_ID";
-   private static final String AMQ_MSG_DATASTRUCTURE = AMQ_PREFIX + "DATASTRUCTURE";
-   private static final String AMQ_MSG_GROUP_ID = org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID.toString();
-   private static final String AMQ_MSG_GROUP_SEQUENCE = AMQ_PREFIX + "GROUP_SEQUENCE";
-   private static final String AMQ_MSG_MESSAGE_ID = AMQ_PREFIX + "MESSAGE_ID";
-   private static final String AMQ_MSG_ORIG_DESTINATION = AMQ_PREFIX + "ORIG_DESTINATION";
-   private static final String AMQ_MSG_ORIG_TXID = AMQ_PREFIX + "ORIG_TXID";
-   private static final String AMQ_MSG_PRODUCER_ID = AMQ_PREFIX + "PRODUCER_ID";
-   private static final String AMQ_MSG_MARSHALL_PROP = AMQ_PREFIX + "MARSHALL_PROP";
-   private static final String AMQ_MSG_REPLY_TO = AMQ_PREFIX + "REPLY_TO";
-
-   private static final String AMQ_MSG_USER_ID = AMQ_PREFIX + "USER_ID";
-
-   private static final String AMQ_MSG_DROPPABLE = AMQ_PREFIX + "DROPPABLE";
-   private static final String AMQ_MSG_COMPRESSED = AMQ_PREFIX + "COMPRESSED";
+   private static final SimpleString JMS_TYPE_PROPERTY = new SimpleString("JMSType");
+   private static final SimpleString JMS_CORRELATION_ID_PROPERTY = new SimpleString("JMSCorrelationID");
+   private static final SimpleString AMQ_PREFIX = new SimpleString("__HDR_");
+   public static final SimpleString AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY = new SimpleString(AMQ_PREFIX + "dlqDeliveryFailureCause");
+
+   private static final SimpleString AMQ_MSG_ARRIVAL = new SimpleString(AMQ_PREFIX + "ARRIVAL");
+   private static final SimpleString AMQ_MSG_BROKER_IN_TIME = new SimpleString(AMQ_PREFIX + "BROKER_IN_TIME");
+
+   private static final SimpleString AMQ_MSG_BROKER_PATH = new SimpleString(AMQ_PREFIX + "BROKER_PATH");
+   private static final SimpleString AMQ_MSG_CLUSTER = new SimpleString(AMQ_PREFIX + "CLUSTER");
+   private static final SimpleString AMQ_MSG_COMMAND_ID = new SimpleString(AMQ_PREFIX + "COMMAND_ID");
+   private static final SimpleString AMQ_MSG_DATASTRUCTURE = new SimpleString(AMQ_PREFIX + "DATASTRUCTURE");
+   private static final SimpleString AMQ_MSG_GROUP_ID = org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID;
+   private static final SimpleString AMQ_MSG_GROUP_SEQUENCE = new SimpleString(AMQ_PREFIX + "GROUP_SEQUENCE");
+   private static final SimpleString AMQ_MSG_MESSAGE_ID = new SimpleString(AMQ_PREFIX + "MESSAGE_ID");
+   private static final SimpleString AMQ_MSG_ORIG_DESTINATION =  new SimpleString(AMQ_PREFIX + "ORIG_DESTINATION");
+   private static final SimpleString AMQ_MSG_ORIG_TXID = new SimpleString(AMQ_PREFIX + "ORIG_TXID");
+   private static final SimpleString AMQ_MSG_PRODUCER_ID =  new SimpleString(AMQ_PREFIX + "PRODUCER_ID");
+   private static final SimpleString AMQ_MSG_MARSHALL_PROP = new SimpleString(AMQ_PREFIX + "MARSHALL_PROP");
+   private static final SimpleString AMQ_MSG_REPLY_TO = new SimpleString(AMQ_PREFIX + "REPLY_TO");
+
+   private static final SimpleString AMQ_MSG_USER_ID = new SimpleString(AMQ_PREFIX + "USER_ID");
+
+   private static final SimpleString AMQ_MSG_DROPPABLE =  new SimpleString(AMQ_PREFIX + "DROPPABLE");
+   private static final SimpleString AMQ_MSG_COMPRESSED = new SimpleString(AMQ_PREFIX + "COMPRESSED");
 
    private static final String AMQ_NOTIFICATIONS_DESTINATION = "activemq.notifications";
 
@@ -128,7 +130,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
 
       final String type = messageSend.getType();
       if (type != null) {
-         coreMessage.putStringProperty(new SimpleString("JMSType"), new SimpleString(type));
+         coreMessage.putStringProperty(JMS_TYPE_PROPERTY, new SimpleString(type));
       }
       coreMessage.setDurable(messageSend.isPersistent());
       coreMessage.setExpiration(messageSend.getExpiration());
@@ -185,7 +187,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
       coreMessage.putIntProperty(AMQ_MSG_COMMAND_ID, messageSend.getCommandId());
       final String corrId = messageSend.getCorrelationId();
       if (corrId != null) {
-         coreMessage.putStringProperty("JMSCorrelationID", corrId);
+         coreMessage.putStringProperty(JMS_CORRELATION_ID_PROPERTY, new SimpleString(corrId));
       }
       final DataStructure ds = messageSend.getDataStructure();
       if (ds != null) {
@@ -193,7 +195,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
       }
       final String groupId = messageSend.getGroupID();
       if (groupId != null) {
-         coreMessage.putStringProperty(AMQ_MSG_GROUP_ID, groupId);
+         coreMessage.putStringProperty(AMQ_MSG_GROUP_ID, new SimpleString(groupId));
       }
       coreMessage.putIntProperty(AMQ_MSG_GROUP_SEQUENCE, messageSend.getGroupSequence());
 
@@ -219,7 +221,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
 
       final String userId = messageSend.getUserID();
       if (userId != null) {
-         coreMessage.putStringProperty(AMQ_MSG_USER_ID, userId);
+         coreMessage.putStringProperty(AMQ_MSG_USER_ID, new SimpleString(userId));
       }
       coreMessage.putBooleanProperty(AMQ_MSG_DROPPABLE, messageSend.isDroppable());
 
@@ -415,7 +417,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
             builder.append(','); //is this separator safe?
          }
       }
-      coreMessage.putStringProperty(AMQ_MSG_BROKER_PATH, builder.toString());
+      coreMessage.putStringProperty(AMQ_MSG_BROKER_PATH, new SimpleString(builder.toString()));
    }
 
    private static void putMsgCluster(final BrokerId[] cluster, final CoreMessage coreMessage) {
@@ -426,7 +428,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
             builder.append(','); //is this separator safe?
          }
       }
-      coreMessage.putStringProperty(AMQ_MSG_CLUSTER, builder.toString());
+      coreMessage.putStringProperty(AMQ_MSG_CLUSTER, new SimpleString(builder.toString()));
    }
 
    private static void putMsgDataStructure(final DataStructure ds,
@@ -557,7 +559,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
             throw new IllegalStateException("Unknown message type: " + coreMessage.getType());
       }
 
-      String type = coreMessage.getStringProperty(new SimpleString("JMSType"));
+      String type = coreMessage.getStringProperty(JMS_TYPE_PROPERTY);
       if (type != null) {
          amqMsg.setJMSType(type);
       }
@@ -580,156 +582,155 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
       byte[] bytes = null;
       if (buffer != null) {
          buffer.resetReaderIndex();
-         synchronized (buffer) {
-            if (coreType == org.apache.activemq.artemis.api.core.Message.TEXT_TYPE) {
-               SimpleString text = buffer.readNullableSimpleString();
-               if (text != null) {
-                  ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(text.length() + 4);
-                  OutputStream out = bytesOut;
-                  if (isCompressed) {
-                     out = new DeflaterOutputStream(out, true);
-                  }
-                  try (DataOutputStream dataOut = new DataOutputStream(out)) {
-                     MarshallingSupport.writeUTF8(dataOut, text.toString());
-                     dataOut.flush();
-                     bytes = bytesOut.toByteArray();
-                  }
-               }
-            } else if (coreType == org.apache.activemq.artemis.api.core.Message.MAP_TYPE) {
-               TypedProperties mapData = new TypedProperties();
-               //it could be a null map
-               if (buffer.readableBytes() > 0) {
-                  mapData.decode(buffer.byteBuf());
-                  Map<String, Object> map = mapData.getMap();
-                  ByteArrayOutputStream out = new ByteArrayOutputStream(mapData.getEncodeSize());
-                  OutputStream os = out;
-                  if (isCompressed) {
-                     os = new DeflaterOutputStream(os, true);
-                  }
-                  try (DataOutputStream dataOut = new DataOutputStream(os)) {
-                     MarshallingSupport.marshalPrimitiveMap(map, dataOut);
-                     dataOut.flush();
-                  }
-                  bytes = out.toByteArray();
-               }
 
-            } else if (coreType == org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE) {
-               if (buffer.readableBytes() > 0) {
-                  int len = buffer.readInt();
-                  bytes = new byte[len];
-                  buffer.readBytes(bytes);
-                  if (isCompressed) {
-                     ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
-                     try (DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) {
-                        out.write(bytes);
-                        out.flush();
-                     }
-                     bytes = bytesOut.toByteArray();
-                  }
-               }
-            } else if (coreType == org.apache.activemq.artemis.api.core.Message.STREAM_TYPE) {
-               org.apache.activemq.util.ByteArrayOutputStream bytesOut = new org.apache.activemq.util.ByteArrayOutputStream();
+         if (coreType == org.apache.activemq.artemis.api.core.Message.TEXT_TYPE) {
+            SimpleString text = buffer.readNullableSimpleString();
+            if (text != null) {
+               ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(text.length() + 4);
                OutputStream out = bytesOut;
                if (isCompressed) {
-                  out = new DeflaterOutputStream(bytesOut, true);
+                  out = new DeflaterOutputStream(out, true);
                }
                try (DataOutputStream dataOut = new DataOutputStream(out)) {
-
-                  boolean stop = false;
-                  while (!stop && buffer.readable()) {
-                     byte primitiveType = buffer.readByte();
-                     switch (primitiveType) {
-                        case DataConstants.BOOLEAN:
-                           MarshallingSupport.marshalBoolean(dataOut, buffer.readBoolean());
-                           break;
-                        case DataConstants.BYTE:
-                           MarshallingSupport.marshalByte(dataOut, buffer.readByte());
-                           break;
-                        case DataConstants.BYTES:
-                           int len = buffer.readInt();
-                           byte[] bytesData = new byte[len];
-                           buffer.readBytes(bytesData);
-                           MarshallingSupport.marshalByteArray(dataOut, bytesData);
-                           break;
-                        case DataConstants.CHAR:
-                           char ch = (char) buffer.readShort();
-                           MarshallingSupport.marshalChar(dataOut, ch);
-                           break;
-                        case DataConstants.DOUBLE:
-                           double doubleVal = Double.longBitsToDouble(buffer.readLong());
-                           MarshallingSupport.marshalDouble(dataOut, doubleVal);
-                           break;
-                        case DataConstants.FLOAT:
-                           Float floatVal = Float.intBitsToFloat(buffer.readInt());
-                           MarshallingSupport.marshalFloat(dataOut, floatVal);
-                           break;
-                        case DataConstants.INT:
-                           MarshallingSupport.marshalInt(dataOut, buffer.readInt());
-                           break;
-                        case DataConstants.LONG:
-                           MarshallingSupport.marshalLong(dataOut, buffer.readLong());
-                           break;
-                        case DataConstants.SHORT:
-                           MarshallingSupport.marshalShort(dataOut, buffer.readShort());
-                           break;
-                        case DataConstants.STRING:
-                           String string = buffer.readNullableString();
-                           if (string == null) {
-                              MarshallingSupport.marshalNull(dataOut);
-                           } else {
-                              MarshallingSupport.marshalString(dataOut, string);
-                           }
-                           break;
-                        default:
-                           //now we stop
-                           stop = true;
-                           break;
-                     }
-                     dataOut.flush();
-                  }
+                  MarshallingSupport.writeUTF8(dataOut, text.toString());
+                  dataOut.flush();
+                  bytes = bytesOut.toByteArray();
                }
-               bytes = bytesOut.toByteArray();
-            } else if (coreType == org.apache.activemq.artemis.api.core.Message.BYTES_TYPE) {
-               int n = buffer.readableBytes();
-               bytes = new byte[n];
-               buffer.readBytes(bytes);
+            }
+         } else if (coreType == org.apache.activemq.artemis.api.core.Message.MAP_TYPE) {
+            TypedProperties mapData = new TypedProperties();
+            //it could be a null map
+            if (buffer.readableBytes() > 0) {
+               mapData.decode(buffer.byteBuf());
+               Map<String, Object> map = mapData.getMap();
+               ByteArrayOutputStream out = new ByteArrayOutputStream(mapData.getEncodeSize());
+               OutputStream os = out;
                if (isCompressed) {
-                  int length = bytes.length;
-                  Deflater deflater = new Deflater();
-                  try (org.apache.activemq.util.ByteArrayOutputStream compressed = new org.apache.activemq.util.ByteArrayOutputStream()) {
-                     compressed.write(new byte[4]);
-                     deflater.setInput(bytes);
-                     deflater.finish();
-                     byte[] bytesBuf = new byte[1024];
-                     while (!deflater.finished()) {
-                        int count = deflater.deflate(bytesBuf);
-                        compressed.write(bytesBuf, 0, count);
-                     }
-                     compressed.flush();
-                     ByteSequence byteSeq = compressed.toByteSequence();
-                     ByteSequenceData.writeIntBig(byteSeq, length);
-                     bytes = Arrays.copyOfRange(byteSeq.data, 0, byteSeq.length);
-                  } finally {
-                     deflater.end();
-                  }
+                  os = new DeflaterOutputStream(os, true);
+               }
+               try (DataOutputStream dataOut = new DataOutputStream(os)) {
+                  MarshallingSupport.marshalPrimitiveMap(map, dataOut);
+                  dataOut.flush();
                }
-            } else {
-               int n = buffer.readableBytes();
-               bytes = new byte[n];
+               bytes = out.toByteArray();
+            }
+
+         } else if (coreType == org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE) {
+            if (buffer.readableBytes() > 0) {
+               int len = buffer.readInt();
+               bytes = new byte[len];
                buffer.readBytes(bytes);
                if (isCompressed) {
-                  try (ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
-                       DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) {
+                  ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+                  try (DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) {
                      out.write(bytes);
                      out.flush();
-                     bytes = bytesOut.toByteArray();
                   }
+                  bytes = bytesOut.toByteArray();
+               }
+            }
+         } else if (coreType == org.apache.activemq.artemis.api.core.Message.STREAM_TYPE) {
+            org.apache.activemq.util.ByteArrayOutputStream bytesOut = new org.apache.activemq.util.ByteArrayOutputStream();
+            OutputStream out = bytesOut;
+            if (isCompressed) {
+               out = new DeflaterOutputStream(bytesOut, true);
+            }
+            try (DataOutputStream dataOut = new DataOutputStream(out)) {
+
+               boolean stop = false;
+               while (!stop && buffer.readable()) {
+                  byte primitiveType = buffer.readByte();
+                  switch (primitiveType) {
+                     case DataConstants.BOOLEAN:
+                        MarshallingSupport.marshalBoolean(dataOut, buffer.readBoolean());
+                        break;
+                     case DataConstants.BYTE:
+                        MarshallingSupport.marshalByte(dataOut, buffer.readByte());
+                        break;
+                     case DataConstants.BYTES:
+                        int len = buffer.readInt();
+                        byte[] bytesData = new byte[len];
+                        buffer.readBytes(bytesData);
+                        MarshallingSupport.marshalByteArray(dataOut, bytesData);
+                        break;
+                     case DataConstants.CHAR:
+                        char ch = (char) buffer.readShort();
+                        MarshallingSupport.marshalChar(dataOut, ch);
+                        break;
+                     case DataConstants.DOUBLE:
+                        double doubleVal = Double.longBitsToDouble(buffer.readLong());
+                        MarshallingSupport.marshalDouble(dataOut, doubleVal);
+                        break;
+                     case DataConstants.FLOAT:
+                        Float floatVal = Float.intBitsToFloat(buffer.readInt());
+                        MarshallingSupport.marshalFloat(dataOut, floatVal);
+                        break;
+                     case DataConstants.INT:
+                        MarshallingSupport.marshalInt(dataOut, buffer.readInt());
+                        break;
+                     case DataConstants.LONG:
+                        MarshallingSupport.marshalLong(dataOut, buffer.readLong());
+                        break;
+                     case DataConstants.SHORT:
+                        MarshallingSupport.marshalShort(dataOut, buffer.readShort());
+                        break;
+                     case DataConstants.STRING:
+                        String string = buffer.readNullableString();
+                        if (string == null) {
+                           MarshallingSupport.marshalNull(dataOut);
+                        } else {
+                           MarshallingSupport.marshalString(dataOut, string);
+                        }
+                        break;
+                     default:
+                        //now we stop
+                        stop = true;
+                        break;
+                  }
+                  dataOut.flush();
+               }
+            }
+            bytes = bytesOut.toByteArray();
+         } else if (coreType == org.apache.activemq.artemis.api.core.Message.BYTES_TYPE) {
+            int n = buffer.readableBytes();
+            bytes = new byte[n];
+            buffer.readBytes(bytes);
+            if (isCompressed) {
+               int length = bytes.length;
+               Deflater deflater = new Deflater();
+               try (org.apache.activemq.util.ByteArrayOutputStream compressed = new org.apache.activemq.util.ByteArrayOutputStream()) {
+                  compressed.write(new byte[4]);
+                  deflater.setInput(bytes);
+                  deflater.finish();
+                  byte[] bytesBuf = new byte[1024];
+                  while (!deflater.finished()) {
+                     int count = deflater.deflate(bytesBuf);
+                     compressed.write(bytesBuf, 0, count);
+                  }
+                  compressed.flush();
+                  ByteSequence byteSeq = compressed.toByteSequence();
+                  ByteSequenceData.writeIntBig(byteSeq, length);
+                  bytes = Arrays.copyOfRange(byteSeq.data, 0, byteSeq.length);
+               } finally {
+                  deflater.end();
+               }
+            }
+         } else {
+            int n = buffer.readableBytes();
+            bytes = new byte[n];
+            buffer.readBytes(bytes);
+            if (isCompressed) {
+               try (ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+                    DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) {
+                  out.write(bytes);
+                  out.flush();
+                  bytes = bytesOut.toByteArray();
                }
             }
-
-            buffer.resetReaderIndex();// this is important for topics as the buffer
-            // may be read multiple times
          }
+
+         buffer.resetReaderIndex();// this is important for topics as the buffer
+         // may be read multiple times
       }
 
       //we need check null because messages may come from other clients
@@ -767,7 +768,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
       }
       amqMsg.setCommandId(commandId);
 
-      SimpleString corrId = (SimpleString) coreMessage.getObjectProperty("JMSCorrelationID");
+      SimpleString corrId = (SimpleString) coreMessage.getObjectProperty(JMS_CORRELATION_ID_PROPERTY);
       if (corrId != null) {
          amqMsg.setCorrelationId(corrId.toString());
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64724c35/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 57865b7..77051cc 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -286,7 +286,7 @@ public class AMQConsumer {
             for (MessageReference ref : ackList) {
                Throwable poisonCause = ack.getPoisonCause();
                if (poisonCause != null) {
-                  ref.getMessage().putStringProperty(OpenWireMessageConverter.AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, poisonCause.toString());
+                  ref.getMessage().putStringProperty(OpenWireMessageConverter.AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, new SimpleString(poisonCause.toString()));
                }
                ref.getQueue().sendToDeadLetterAddress(transaction, ref);
             }