You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2021/03/23 09:48:57 UTC

[activemq-artemis] branch master updated: ARTEMIS-3175 - implement address setting management-message-attribute-size-limit to sensibly limit data returned by list/browse/filter management ops

This is an automated email from the ASF dual-hosted git repository.

gtully pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 02bb703  ARTEMIS-3175 - implement address setting management-message-attribute-size-limit to sensibly limit data returned by list/browse/filter management ops
02bb703 is described below

commit 02bb7031c296ef22d1e58906836b1e579988d0cb
Author: gtully <ga...@gmail.com>
AuthorDate: Fri Mar 12 12:18:00 2021 +0000

    ARTEMIS-3175 - implement address setting management-message-attribute-size-limit to sensibly limit data returned by list/browse/filter management ops
---
 .../activemq/artemis/api/core/ICoreMessage.java    |   5 +-
 .../apache/activemq/artemis/api/core/JsonUtil.java |  92 ++++++++--------
 .../apache/activemq/artemis/api/core/Message.java  |  20 +++-
 .../activemq/artemis/message/CoreMessageTest.java  |  27 +++++
 .../artemis/protocol/amqp/broker/AMQPMessage.java  |   4 +-
 .../deployers/impl/FileConfigurationParser.java    |   4 +
 .../core/management/impl/QueueControlImpl.java     |  23 ++--
 .../management/impl/openmbean/OpenTypeSupport.java |  94 +++++++---------
 .../core/settings/impl/AddressSettings.java        |  33 +++++-
 .../transaction/impl/CoreTransactionDetail.java    |   2 +-
 .../resources/schema/artemis-configuration.xsd     |  10 +-
 .../resources/ConfigurationTest-full-config.xml    |   2 +
 .../tests/integration/amqp/JMXManagementTest.java  |  20 +++-
 .../integration/management/QueueControlTest.java   | 118 ++++++++++++++++++++-
 14 files changed, 336 insertions(+), 118 deletions(-)

diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java
index d367985..71d4f5f 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java
@@ -102,10 +102,11 @@ public interface ICoreMessage extends Message {
 
    /**
     * @return Returns the message in Map form, useful when encoding to JSON
+    * @param valueSizeLimit
     */
    @Override
-   default Map<String, Object> toMap() {
-      Map map = toPropertyMap();
+   default Map<String, Object> toMap(int valueSizeLimit) {
+      Map map = toPropertyMap(valueSizeLimit);
       map.put("messageID", getMessageID());
       Object userID = getUserID();
       if (getUserID() != null) {
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JsonUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JsonUtil.java
index e017be2..0576cc9 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JsonUtil.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JsonUtil.java
@@ -28,6 +28,7 @@ import javax.management.openmbean.CompositeDataSupport;
 import java.io.ByteArrayInputStream;
 import java.io.StringReader;
 import java.lang.reflect.Array;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -45,52 +46,7 @@ public final class JsonUtil {
       JsonArrayBuilder jsonArray = JsonLoader.createArrayBuilder();
 
       for (Object parameter : array) {
-         if (parameter instanceof Map) {
-            Map<String, Object> map = (Map<String, Object>) parameter;
-
-            JsonObjectBuilder jsonObject = JsonLoader.createObjectBuilder();
-
-            for (Map.Entry<String, Object> entry : map.entrySet()) {
-               String key = entry.getKey();
-
-               Object val = entry.getValue();
-
-               if (val != null) {
-                  if (val.getClass().isArray()) {
-                     JsonArray objectArray = toJSONArray((Object[]) val);
-                     jsonObject.add(key, objectArray);
-                  } else {
-                     addToObject(key, val, jsonObject);
-                  }
-               }
-            }
-            jsonArray.add(jsonObject);
-         } else {
-            if (parameter != null) {
-               Class<?> clz = parameter.getClass();
-
-               if (clz.isArray()) {
-                  Object[] innerArray = (Object[]) parameter;
-
-                  if (innerArray instanceof CompositeData[]) {
-                     JsonArrayBuilder innerJsonArray = JsonLoader.createArrayBuilder();
-                     for (Object data : innerArray) {
-                        String s = Base64.encodeObject((CompositeDataSupport) data);
-                        innerJsonArray.add(s);
-                     }
-                     JsonObjectBuilder jsonObject = JsonLoader.createObjectBuilder();
-                     jsonObject.add(CompositeData.class.getName(), innerJsonArray);
-                     jsonArray.add(jsonObject);
-                  } else {
-                     jsonArray.add(toJSONArray(innerArray));
-                  }
-               } else {
-                  addToArray(parameter, jsonArray);
-               }
-            } else {
-               jsonArray.addNull();
-            }
-         }
+         addToArray(parameter, jsonArray);
       }
       return jsonArray.build();
    }
@@ -210,6 +166,12 @@ public final class JsonUtil {
       } else if (param instanceof byte[]) {
          JsonArrayBuilder byteArrayObject = toJsonArrayBuilder((byte[]) param);
          jsonObjectBuilder.add(key, byteArrayObject);
+      } else if (param instanceof Object[]) {
+         final JsonArrayBuilder objectArrayBuilder = JsonLoader.createArrayBuilder();
+         for (Object parameter : (Object[])param) {
+            addToArray(parameter, objectArrayBuilder);
+         }
+         jsonObjectBuilder.add(key, objectArrayBuilder);
       } else {
          throw ActiveMQClientMessageBundle.BUNDLE.invalidManagementParam(param.getClass().getName());
       }
@@ -238,6 +200,21 @@ public final class JsonUtil {
       } else if (param instanceof byte[]) {
          JsonArrayBuilder byteArrayObject = toJsonArrayBuilder((byte[]) param);
          jsonArrayBuilder.add(byteArrayObject);
+      } else if (param instanceof CompositeData[]) {
+         JsonArrayBuilder innerJsonArray = JsonLoader.createArrayBuilder();
+         for (Object data : (CompositeData[])param) {
+            String s = Base64.encodeObject((CompositeDataSupport) data);
+            innerJsonArray.add(s);
+         }
+         JsonObjectBuilder jsonObject = JsonLoader.createObjectBuilder();
+         jsonObject.add(CompositeData.class.getName(), innerJsonArray);
+         jsonArrayBuilder.add(jsonObject);
+      } else if (param instanceof Object[]) {
+         JsonArrayBuilder objectArrayBuilder = JsonLoader.createArrayBuilder();
+         for (Object parameter : (Object[])param) {
+            addToArray(parameter, objectArrayBuilder);
+         }
+         jsonArrayBuilder.add(objectArrayBuilder);
       } else {
          throw ActiveMQClientMessageBundle.BUNDLE.invalidManagementParam(param.getClass().getName());
       }
@@ -345,6 +322,29 @@ public final class JsonUtil {
    private JsonUtil() {
    }
 
+   public static Object truncate(final Object value, final int valueSizeLimit) {
+      Object result = value;
+      if (valueSizeLimit >= 0) {
+         if (String.class.equals(value.getClass())) {
+            String str = (String) value;
+            if (str.length() > valueSizeLimit) {
+               result = new StringBuilder(valueSizeLimit + 32).append(str.substring(0, valueSizeLimit)).append(", + ").append(str.length() - valueSizeLimit).append(" more").toString();
+            }
+         } else if (value.getClass().isArray()) {
+            if (byte[].class.equals(value.getClass())) {
+               if (((byte[]) value).length > valueSizeLimit) {
+                  result = Arrays.copyOfRange((byte[]) value, 0, valueSizeLimit);
+               }
+            } else if (char[].class.equals(value.getClass())) {
+               if (((char[]) value).length > valueSizeLimit) {
+                  result = Arrays.copyOfRange((char[]) value, 0, valueSizeLimit);
+               }
+            }
+         }
+      }
+      return result;
+   }
+
    private static class NullableJsonString implements JsonValue, JsonString {
 
       private final String value;
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index 2493747..b5213e9 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -708,7 +708,15 @@ public interface Message {
     * @return Returns the message in Map form, useful when encoding to JSON
     */
    default Map<String, Object> toMap() {
-      Map map = toPropertyMap();
+      return toMap(-1);
+   }
+
+   /**
+    * @return Returns the message in Map form, useful when encoding to JSON
+    * @param valueSizeLimit that limits [] map values
+    */
+   default Map<String, Object> toMap(int valueSizeLimit) {
+      Map map = toPropertyMap(valueSizeLimit);
       map.put("messageID", getMessageID());
       Object userID = getUserID();
       if (getUserID() != null) {
@@ -728,6 +736,14 @@ public interface Message {
     * @return Returns the message properties in Map form, useful when encoding to JSON
     */
    default Map<String, Object> toPropertyMap() {
+      return toPropertyMap(-1);
+   }
+
+   /**
+    * @return Returns the message properties in Map form, useful when encoding to JSON
+    * @param valueSizeLimit that limits [] map values
+    */
+   default Map<String, Object> toPropertyMap(int valueSizeLimit) {
       Map map = new HashMap<>();
       for (SimpleString name : getPropertyNames()) {
          Object value = getObjectProperty(name.toString());
@@ -735,12 +751,12 @@ public interface Message {
          if (value instanceof SimpleString) {
             value = value.toString();
          }
+         value = JsonUtil.truncate(value, valueSizeLimit);
          map.put(name.toString(), value);
       }
       return map;
    }
 
-
    /** This should make you convert your message into Core format. */
    ICoreMessage toCore();
 
diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java
index 0735e1e..c52daa4 100644
--- a/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java
+++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.activemq.artemis.message;
 
+import java.nio.charset.StandardCharsets;
 import java.util.LinkedList;
 
 import io.netty.buffer.ByteBuf;
@@ -212,6 +213,32 @@ public class CoreMessageTest {
    }
 
    @Test
+   public void testToMapLimit() throws Exception {
+
+      CoreMessage coreMessage = new CoreMessage().initBuffer(BIGGER_TEXT.length() * 2);
+      coreMessage.putStringProperty("prop", BIGGER_TEXT);
+      coreMessage.putBytesProperty("bytesProp", BIGGER_TEXT.getBytes(StandardCharsets.UTF_8));
+
+      Assert.assertEquals(BIGGER_TEXT.length(), ((String)coreMessage.toMap().get("prop")).length());
+      Assert.assertEquals(BIGGER_TEXT.getBytes(StandardCharsets.UTF_8).length, ((byte[])coreMessage.toMap().get("bytesProp")).length);
+
+      // limit the values
+      Assert.assertNotEquals(BIGGER_TEXT.getBytes(StandardCharsets.UTF_8).length, ((byte[])coreMessage.toMap(40).get("bytesProp")).length);
+      String mapVal = ((String)coreMessage.toMap(40).get("prop"));
+      Assert.assertNotEquals(BIGGER_TEXT.length(), mapVal.length());
+      Assert.assertTrue(mapVal.contains("more"));
+
+      mapVal = ((String)coreMessage.toMap(0).get("prop"));
+      Assert.assertNotEquals(BIGGER_TEXT.length(), mapVal.length());
+      Assert.assertTrue(mapVal.contains("more"));
+
+      Assert.assertEquals(BIGGER_TEXT.length(), Integer.parseInt(mapVal.substring(mapVal.lastIndexOf('+') + 1, mapVal.lastIndexOf('m')).trim()));
+
+      Assert.assertEquals(BIGGER_TEXT.getBytes(StandardCharsets.UTF_8).length, ((byte[])coreMessage.toPropertyMap().get("bytesProp")).length);
+      Assert.assertNotEquals(BIGGER_TEXT.getBytes(StandardCharsets.UTF_8).length, ((byte[])coreMessage.toPropertyMap(40).get("bytesProp")).length);
+   }
+
+   @Test
    public void testSaveReceiveLimitedBytes() {
       CoreMessage empty = new CoreMessage().initBuffer(100);
       empty.getBodyBuffer().writeByte((byte)7);
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 25e983c..f32c285 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -31,6 +31,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.api.core.JsonUtil;
 import org.apache.activemq.artemis.api.core.RefCountMessage;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -835,7 +836,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
    public abstract int getMemoryEstimate();
 
    @Override
-   public Map<String, Object> toPropertyMap() {
+   public Map<String, Object> toPropertyMap(int valueSizeLimit) {
       Map map = new HashMap<>();
       for (SimpleString name : getPropertyNames()) {
          Object value = getObjectProperty(name.toString());
@@ -843,6 +844,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
          if (value instanceof Binary) {
             value = ((Binary)value).getArray();
          }
+         value = JsonUtil.truncate(value, valueSizeLimit);
          map.put(name.toString(), value);
       }
       return map;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 6994860..f6e3019 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -281,6 +281,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
 
    private static final String MANAGEMENT_BROWSE_PAGE_SIZE = "management-browse-page-size";
 
+   private static final String MANAGEMENT_MESSAGE_ATTRIBUTE_SIZE_LIMIT = "management-message-attribute-size-limit";
+
    private static final String MAX_CONNECTIONS_NODE_NAME = "max-connections";
 
    private static final String MAX_QUEUES_NODE_NAME = "max-queues";
@@ -1250,6 +1252,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
             addressSettings.setConfigDeleteAddresses(policy);
          } else if (MANAGEMENT_BROWSE_PAGE_SIZE.equalsIgnoreCase(name)) {
             addressSettings.setManagementBrowsePageSize(XMLUtil.parseInt(child));
+         } else if (MANAGEMENT_MESSAGE_ATTRIBUTE_SIZE_LIMIT.equalsIgnoreCase(name)) {
+            addressSettings.setManagementMessageAttributeSizeLimit(XMLUtil.parseInt(child));
          } else if (DEFAULT_PURGE_ON_NO_CONSUMERS.equalsIgnoreCase(name)) {
             addressSettings.setDefaultPurgeOnNoConsumers(XMLUtil.parseBoolean(child));
          } else if (DEFAULT_MAX_CONSUMERS.equalsIgnoreCase(name)) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index 4e56212..67f7549 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -788,11 +788,12 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
     * @return
     */
    private Map<String, Object>[] convertMessagesToMaps(List<MessageReference> refs) throws ActiveMQException {
+      final int attributeSizeLimit = addressSettingsRepository.getMatch(address).getManagementMessageAttributeSizeLimit();
       Map<String, Object>[] messages = new Map[refs.size()];
       int i = 0;
       for (MessageReference ref : refs) {
          Message message = ref.getMessage();
-         messages[i++] = message.toMap();
+         messages[i++] = message.toMap(attributeSizeLimit);
       }
       return messages;
    }
@@ -847,7 +848,9 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
          Filter filter = FilterImpl.createFilter(filterStr);
          List<Map<String, Object>> messages = new ArrayList<>();
          queue.flushExecutor();
-         final int limit = addressSettingsRepository.getMatch(queue.getAddress().toString()).getManagementBrowsePageSize();
+         final AddressSettings addressSettings = addressSettingsRepository.getMatch(address);
+         final int attributeSizeLimit = addressSettings.getManagementMessageAttributeSizeLimit();
+         final int limit = addressSettings.getManagementBrowsePageSize();
          int count = 0;
          try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
             try {
@@ -855,7 +858,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
                   MessageReference ref = iterator.next();
                   if (filter == null || filter.match(ref.getMessage())) {
                      Message message = ref.getMessage();
-                     messages.add(message.toMap());
+                     messages.add(message.toMap(attributeSizeLimit));
                   }
                }
             } catch (NoSuchElementException ignored) {
@@ -895,12 +898,13 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
       try {
          List<Map<String, Object>> messages = new ArrayList<>();
          queue.flushExecutor();
+         final int attributeSizeLimit = addressSettingsRepository.getMatch(address).getManagementMessageAttributeSizeLimit();
          try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
             // returns just the first, as it's the first only
             if (iterator.hasNext()) {
                MessageReference ref = iterator.next();
                Message message = ref.getMessage();
-               messages.add(message.toMap());
+               messages.add(message.toMap(attributeSizeLimit));
             }
             return messages.toArray(new Map[1]);
          }
@@ -985,7 +989,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
          if (filter == null && groupByProperty == null) {
             result.put(null, getMessageCount());
          } else {
-            final int limit = addressSettingsRepository.getMatch(queue.getAddress().toString()).getManagementBrowsePageSize();
+            final int limit = addressSettingsRepository.getMatch(address).getManagementBrowsePageSize();
             int count = 0;
             try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
                try {
@@ -1552,13 +1556,14 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
          Filter thefilter = FilterImpl.createFilter(filter);
          queue.flushExecutor();
 
+         final int attributeSizeLimit = addressSettingsRepository.getMatch(address).getManagementMessageAttributeSizeLimit();
          try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
             try {
                while (iterator.hasNext() && index < end) {
                   MessageReference ref = iterator.next();
                   if (thefilter == null || thefilter.match(ref.getMessage())) {
                      if (index >= start) {
-                        c.add(OpenTypeSupport.convert(ref));
+                        c.add(OpenTypeSupport.convert(ref, attributeSizeLimit));
                      }
                   }
                   index++;
@@ -1597,7 +1602,9 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
 
       clearIO();
       try {
-         int limit = addressSettingsRepository.getMatch(queue.getAddress().toString()).getManagementBrowsePageSize();
+         final AddressSettings addressSettings = addressSettingsRepository.getMatch(address);
+         final int attributeSizeLimit = addressSettings.getManagementMessageAttributeSizeLimit();
+         final int limit = addressSettings.getManagementBrowsePageSize();
          int currentPageSize = 0;
          ArrayList<CompositeData> c = new ArrayList<>();
          Filter thefilter = FilterImpl.createFilter(filter);
@@ -1607,7 +1614,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
                while (iterator.hasNext() && currentPageSize++ < limit) {
                   MessageReference ref = iterator.next();
                   if (thefilter == null || thefilter.match(ref.getMessage())) {
-                     c.add(OpenTypeSupport.convert(ref));
+                     c.add(OpenTypeSupport.convert(ref, attributeSizeLimit));
 
                   }
                }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java
index 027bbdb..7c5bd65 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java
@@ -34,6 +34,7 @@ import java.util.Map;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.api.core.JsonUtil;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.MessageReference;
@@ -46,7 +47,7 @@ public final class OpenTypeSupport {
    private OpenTypeSupport() {
    }
 
-   public static CompositeData convert(MessageReference ref) throws OpenDataException {
+   public static CompositeData convert(MessageReference ref, int valueSizeLimit) throws OpenDataException {
       CompositeType ct;
 
       ICoreMessage message = ref.getMessage().toCore();
@@ -57,11 +58,11 @@ public final class OpenTypeSupport {
       switch(type) {
          case Message.TEXT_TYPE:
             ct = TEXT_FACTORY.getCompositeType();
-            fields = TEXT_FACTORY.getFields(ref);
+            fields = TEXT_FACTORY.getFields(ref, valueSizeLimit);
             break;
          default:
             ct = BYTES_FACTORY.getCompositeType();
-            fields = BYTES_FACTORY.getFields(ref);
+            fields = BYTES_FACTORY.getFields(ref, valueSizeLimit);
             break;
       }
       return new CompositeDataSupport(ct, fields);
@@ -82,6 +83,7 @@ public final class OpenTypeSupport {
       protected TabularType longPropertyTabularType;
       protected TabularType floatPropertyTabularType;
       protected TabularType doublePropertyTabularType;
+      protected Object[][] typedPropertyFields;
 
       protected String getTypeName() {
          return Message.class.getName();
@@ -129,9 +131,21 @@ public final class OpenTypeSupport {
          addItem(CompositeDataConstants.LONG_PROPERTIES, CompositeDataConstants.LONG_PROPERTIES_DESCRIPTION, longPropertyTabularType);
          addItem(CompositeDataConstants.FLOAT_PROPERTIES, CompositeDataConstants.FLOAT_PROPERTIES_DESCRIPTION, floatPropertyTabularType);
          addItem(CompositeDataConstants.DOUBLE_PROPERTIES, CompositeDataConstants.DOUBLE_PROPERTIES_DESCRIPTION, doublePropertyTabularType);
+
+         typedPropertyFields = new Object[][] {
+            {CompositeDataConstants.STRING_PROPERTIES, stringPropertyTabularType, String.class},
+            {CompositeDataConstants.BOOLEAN_PROPERTIES, booleanPropertyTabularType, Boolean.class},
+            {CompositeDataConstants.BYTE_PROPERTIES, bytePropertyTabularType, Byte.class},
+            {CompositeDataConstants.SHORT_PROPERTIES, shortPropertyTabularType, Short.class},
+            {CompositeDataConstants.INT_PROPERTIES, intPropertyTabularType, Integer.class},
+            {CompositeDataConstants.LONG_PROPERTIES, longPropertyTabularType, Long.class},
+            {CompositeDataConstants.FLOAT_PROPERTIES, floatPropertyTabularType, Float.class},
+            {CompositeDataConstants.DOUBLE_PROPERTIES, doublePropertyTabularType, Double.class}
+         };
+
       }
 
-      public Map<String, Object> getFields(MessageReference ref) throws OpenDataException {
+      public Map<String, Object> getFields(MessageReference ref, int valueSizeLimit) throws OpenDataException {
          Map<String, Object> rc = new HashMap<>();
          ICoreMessage m = ref.getMessage().toCore();
          rc.put(CompositeDataConstants.MESSAGE_ID, "" + m.getMessageID());
@@ -154,49 +168,23 @@ public final class OpenTypeSupport {
             rc.put(CompositeDataConstants.PERSISTENT_SIZE, -1);
          }
 
-         Map<String, Object> propertyMap = m.toPropertyMap();
+         Map<String, Object> propertyMap = m.toPropertyMap(valueSizeLimit);
 
-         rc.put(CompositeDataConstants.PROPERTIES, "" + propertyMap);
+         rc.put(CompositeDataConstants.PROPERTIES, JsonUtil.truncate("" + propertyMap, valueSizeLimit));
 
-         try {
-            rc.put(CompositeDataConstants.STRING_PROPERTIES, createTabularData(propertyMap, stringPropertyTabularType, String.class));
-         } catch (IOException e) {
-            rc.put(CompositeDataConstants.STRING_PROPERTIES, new TabularDataSupport(stringPropertyTabularType));
-         }
-         try {
-            rc.put(CompositeDataConstants.BOOLEAN_PROPERTIES, createTabularData(propertyMap, booleanPropertyTabularType, Boolean.class));
-         } catch (IOException e) {
-            rc.put(CompositeDataConstants.BOOLEAN_PROPERTIES, new TabularDataSupport(booleanPropertyTabularType));
-         }
-         try {
-            rc.put(CompositeDataConstants.BYTE_PROPERTIES, createTabularData(propertyMap, bytePropertyTabularType, Byte.class));
-         } catch (IOException e) {
-            rc.put(CompositeDataConstants.BYTE_PROPERTIES, new TabularDataSupport(bytePropertyTabularType));
-         }
-         try {
-            rc.put(CompositeDataConstants.SHORT_PROPERTIES, createTabularData(propertyMap, shortPropertyTabularType, Short.class));
-         } catch (IOException e) {
-            rc.put(CompositeDataConstants.SHORT_PROPERTIES, new TabularDataSupport(shortPropertyTabularType));
-         }
-         try {
-            rc.put(CompositeDataConstants.INT_PROPERTIES, createTabularData(propertyMap, intPropertyTabularType, Integer.class));
-         } catch (IOException e) {
-            rc.put(CompositeDataConstants.INT_PROPERTIES, new TabularDataSupport(intPropertyTabularType));
-         }
-         try {
-            rc.put(CompositeDataConstants.LONG_PROPERTIES, createTabularData(propertyMap, longPropertyTabularType, Long.class));
-         } catch (IOException e) {
-            rc.put(CompositeDataConstants.LONG_PROPERTIES, new TabularDataSupport(longPropertyTabularType));
-         }
-         try {
-            rc.put(CompositeDataConstants.FLOAT_PROPERTIES, createTabularData(propertyMap, floatPropertyTabularType, Float.class));
-         } catch (IOException e) {
-            rc.put(CompositeDataConstants.FLOAT_PROPERTIES, new TabularDataSupport(floatPropertyTabularType));
-         }
-         try {
-            rc.put(CompositeDataConstants.DOUBLE_PROPERTIES, createTabularData(propertyMap, doublePropertyTabularType, Double.class));
-         } catch (IOException e) {
-            rc.put(CompositeDataConstants.DOUBLE_PROPERTIES, new TabularDataSupport(doublePropertyTabularType));
+         // only populate if there are some values
+         TabularDataSupport tabularData;
+         for (Object[] typedPropertyInfo : typedPropertyFields) {
+            tabularData = null;
+            try {
+               tabularData = createTabularData(propertyMap, (TabularType) typedPropertyInfo[1], (Class) typedPropertyInfo[2]);
+            } catch (Exception ignored) {
+            }
+            if (tabularData != null && !tabularData.isEmpty()) {
+               rc.put((String) typedPropertyInfo[0], tabularData);
+            } else {
+               rc.put((String) typedPropertyInfo[0], null);
+            }
          }
          return rc;
       }
@@ -273,14 +261,14 @@ public final class OpenTypeSupport {
       }
 
       @Override
-      public Map<String, Object> getFields(MessageReference ref) throws OpenDataException {
-         Map<String, Object> rc = super.getFields(ref);
+      public Map<String, Object> getFields(MessageReference ref, int valueSizeLimit) throws OpenDataException {
+         Map<String, Object> rc = super.getFields(ref, valueSizeLimit);
          ICoreMessage m = ref.getMessage().toCore();
          if (!m.isLargeMessage()) {
             ActiveMQBuffer bodyCopy = m.getReadOnlyBodyBuffer();
-            byte[] bytes = new byte[bodyCopy.readableBytes()];
+            byte[] bytes = new byte[bodyCopy.readableBytes() <= valueSizeLimit ? bodyCopy.readableBytes() : valueSizeLimit + 1];
             bodyCopy.readBytes(bytes);
-            rc.put(CompositeDataConstants.BODY, bytes);
+            rc.put(CompositeDataConstants.BODY, JsonUtil.truncate(bytes, valueSizeLimit));
          } else {
             rc.put(CompositeDataConstants.BODY, new byte[0]);
          }
@@ -298,15 +286,15 @@ public final class OpenTypeSupport {
       }
 
       @Override
-      public Map<String, Object> getFields(MessageReference ref) throws OpenDataException {
-         Map<String, Object> rc = super.getFields(ref);
+      public Map<String, Object> getFields(MessageReference ref, int valueSizeLimit) throws OpenDataException {
+         Map<String, Object> rc = super.getFields(ref, valueSizeLimit);
          ICoreMessage m = ref.getMessage().toCore();
          if (!m.isLargeMessage()) {
             if (m.containsProperty(Message.HDR_LARGE_COMPRESSED)) {
                rc.put(CompositeDataConstants.TEXT_BODY, "[compressed]");
             } else {
-               SimpleString text = m.getReadOnlyBodyBuffer().readNullableSimpleString();
-               rc.put(CompositeDataConstants.TEXT_BODY, text != null ? text.toString() : "");
+               final String text = m.getReadOnlyBodyBuffer().readString();
+               rc.put(CompositeDataConstants.TEXT_BODY, text != null ? JsonUtil.truncate(text, valueSizeLimit) : "");
             }
          } else {
             rc.put(CompositeDataConstants.TEXT_BODY, "[large message]");
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
index 567f679..44aa299 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
@@ -127,6 +127,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
 
    public static final boolean DEFAULT_ENABLE_METRICS = true;
 
+   public static final int MANAGEMENT_MESSAGE_ATTRIBUTE_SIZE_LIMIT = 256;
+
    private AddressFullMessagePolicy addressFullMessagePolicy = null;
 
    private Long maxSizeBytes = null;
@@ -253,6 +255,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
 
    private Boolean enableMetrics = null;
 
+   private Integer managementMessageAttributeSizeLimit = null;
+
    //from amq5
    //make it transient
    private transient Integer queuePrefetch = null;
@@ -318,6 +322,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       this.defaultGroupFirstKey = other.defaultGroupFirstKey;
       this.defaultRingSize = other.defaultRingSize;
       this.enableMetrics = other.enableMetrics;
+      this.managementMessageAttributeSizeLimit = other.managementMessageAttributeSizeLimit;
    }
 
    public AddressSettings() {
@@ -914,6 +919,15 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       return this;
    }
 
+   public int getManagementMessageAttributeSizeLimit() {
+      return managementMessageAttributeSizeLimit != null ? managementMessageAttributeSizeLimit : AddressSettings.MANAGEMENT_MESSAGE_ATTRIBUTE_SIZE_LIMIT;
+   }
+
+   public AddressSettings setManagementMessageAttributeSizeLimit(int managementMessageAttributeSizeLimit) {
+      this.managementMessageAttributeSizeLimit = managementMessageAttributeSizeLimit;
+      return this;
+   }
+
    /**
     * merge 2 objects in to 1
     *
@@ -1029,6 +1043,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       if (managementBrowsePageSize == null) {
          managementBrowsePageSize = merged.managementBrowsePageSize;
       }
+      if (managementMessageAttributeSizeLimit == null) {
+         managementMessageAttributeSizeLimit = merged.managementMessageAttributeSizeLimit;
+      }
       if (queuePrefetch == null) {
          queuePrefetch = merged.queuePrefetch;
       }
@@ -1320,6 +1337,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
          defaultGroupRebalancePauseDispatch = BufferHelper.readNullableBoolean(buffer);
       }
 
+      if (buffer.readableBytes() > 0) {
+         managementMessageAttributeSizeLimit = BufferHelper.readNullableInteger(buffer);
+      }
+
    }
 
    @Override
@@ -1383,7 +1404,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
          SimpleString.sizeofNullableString(expiryQueuePrefix) +
          SimpleString.sizeofNullableString(expiryQueueSuffix) +
          BufferHelper.sizeOfNullableBoolean(enableMetrics) +
-         BufferHelper.sizeOfNullableBoolean(defaultGroupRebalancePauseDispatch);
+         BufferHelper.sizeOfNullableBoolean(defaultGroupRebalancePauseDispatch) +
+         BufferHelper.sizeOfNullableInteger(managementMessageAttributeSizeLimit);
    }
 
    @Override
@@ -1510,6 +1532,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
 
       BufferHelper.writeNullableBoolean(buffer, defaultGroupRebalancePauseDispatch);
 
+      BufferHelper.writeNullableInteger(buffer, managementMessageAttributeSizeLimit);
    }
 
    /* (non-Javadoc)
@@ -1581,6 +1604,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       result = prime * result + ((expiryQueuePrefix == null) ? 0 : expiryQueuePrefix.hashCode());
       result = prime * result + ((expiryQueueSuffix == null) ? 0 : expiryQueueSuffix.hashCode());
       result = prime * result + ((enableMetrics == null) ? 0 : enableMetrics.hashCode());
+      result = prime * result + ((managementMessageAttributeSizeLimit == null) ? 0 : managementMessageAttributeSizeLimit.hashCode());
       return result;
    }
 
@@ -1796,6 +1820,11 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
             return false;
       } else if (!managementBrowsePageSize.equals(other.managementBrowsePageSize))
          return false;
+      if (managementMessageAttributeSizeLimit == null) {
+         if (other.managementMessageAttributeSizeLimit != null)
+            return false;
+      } else if (!managementMessageAttributeSizeLimit.equals(other.managementMessageAttributeSizeLimit))
+         return false;
       if (queuePrefetch == null) {
          if (other.queuePrefetch != null)
             return false;
@@ -2017,6 +2046,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
          configDeleteAddresses +
          ", managementBrowsePageSize=" +
          managementBrowsePageSize +
+         ", managementMessageAttributeSizeLimit=" +
+         managementMessageAttributeSizeLimit +
          ", defaultMaxConsumers=" +
          defaultMaxConsumers +
          ", defaultPurgeOnNoConsumers=" +
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/CoreTransactionDetail.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/CoreTransactionDetail.java
index e0ea891..259d1bf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/CoreTransactionDetail.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/CoreTransactionDetail.java
@@ -56,6 +56,6 @@ public class CoreTransactionDetail extends TransactionDetail {
 
    @Override
    public Map<String, Object> decodeMessageProperties(Message msg) {
-      return msg.toMap();
+      return msg.toMap(256);
    }
 }
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 6788727..fed309a 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -3804,7 +3804,15 @@
             <xsd:element name="management-browse-page-size" type="xsd:int" default="200" maxOccurs="1" minOccurs="0">
                <xsd:annotation>
                   <xsd:documentation>
-                     how many message a management resource can browse
+                     how many message a management resource can browse, list or filter
+                  </xsd:documentation>
+               </xsd:annotation>
+            </xsd:element>
+
+            <xsd:element name="management-message-attribute-size-limit" type="xsd:int" default="256" maxOccurs="1" minOccurs="0">
+               <xsd:annotation>
+                  <xsd:documentation>
+                     the size limit of any message attribute value returned from a browse ,list or filter. Attribute values that exceed with be truncated
                   </xsd:documentation>
                </xsd:annotation>
             </xsd:element>
diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index 8d5560d..b6ed421 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -505,6 +505,8 @@
             <default-consumer-window-size>10000</default-consumer-window-size>
             <retroactive-message-count>10</retroactive-message-count>
             <enable-metrics>false</enable-metrics>
+            <management-browse-page-size>400</management-browse-page-size>
+            <management-message-attribute-size-limit>265</management-message-attribute-size-limit>
          </address-setting>
       </address-settings>
       <resource-limit-settings>
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMXManagementTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMXManagementTest.java
index 128636a..2429cc6 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMXManagementTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMXManagementTest.java
@@ -39,6 +39,7 @@ import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.json.JsonArray;
 import javax.json.JsonObject;
+import java.nio.charset.StandardCharsets;
 import java.util.Map;
 
 public class JMXManagementTest extends JMSClientTestSupport {
@@ -107,8 +108,11 @@ public class JMXManagementTest extends JMSClientTestSupport {
          session.begin();
          AmqpMessage message = new AmqpMessage();
          message.setApplicationProperty("TEST_BINARY", new Binary("TEST".getBytes()));
-         message.setApplicationProperty("TEST_STRING", "TEST");
-         message.setText("TEST");
+
+         final String oneK = new String(new char[1024]).replace("\0", "$");
+         message.setApplicationProperty("TEST_BIG_BINARY", new Binary(oneK.getBytes(StandardCharsets.UTF_8)));
+         message.setApplicationProperty("TEST_STRING", oneK);
+         message.setText("NOT_VISIBLE");
          sender.send(message);
          session.commit();
 
@@ -116,6 +120,18 @@ public class JMXManagementTest extends JMSClientTestSupport {
          QueueControl queueControl = createManagementControl(queue, queue);
          String firstMessageAsJSON = queueControl.getFirstMessageAsJSON();
          Assert.assertNotNull(firstMessageAsJSON);
+
+         // Json is still bulky!
+         Assert.assertTrue(firstMessageAsJSON.length() < 1500);
+         Assert.assertFalse(firstMessageAsJSON.contains("NOT_VISIBLE"));
+
+         // composite data limits
+         Map<String, Object>[] result = queueControl.listMessages("");
+         assertEquals(1, result.length);
+
+         final Map<String, Object> msgMap = result[0];
+         Assert.assertTrue(msgMap.get("TEST_STRING").toString().length() < 512);
+
       } finally {
          connection.close();
       }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
index b04b845..457c379 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
@@ -489,6 +489,122 @@ public class QueueControlTest extends ManagementTestBase {
    }
 
    @Test
+   public void testMessageAttributeLimits() throws Exception {
+      SimpleString address = RandomUtil.randomSimpleString();
+      SimpleString queue = RandomUtil.randomSimpleString();
+
+      AddressSettings addressSettings = new AddressSettings().setManagementMessageAttributeSizeLimit(100);
+      server.getAddressSettingsRepository().addMatch(address.toString(), addressSettings);
+
+      session.createQueue(new QueueConfiguration(queue).setAddress(address).setDurable(durable));
+
+      byte[] twoKBytes = new byte[2048];
+      for (int i = 0; i < 2048; i++) {
+         twoKBytes[i] = '*';
+      }
+
+      String twoKString = new String(twoKBytes);
+
+      ClientMessage clientMessage = session.createMessage(false);
+
+      clientMessage.putStringProperty("y", "valueY");
+      clientMessage.putStringProperty("bigString", twoKString);
+      clientMessage.putBytesProperty("bigBytes", twoKBytes);
+      clientMessage.putObjectProperty("bigObject", twoKString);
+
+      clientMessage.getBodyBuffer().writeBytes(twoKBytes);
+
+      QueueControl queueControl = createManagementControl(address, queue);
+      Assert.assertEquals(0, getMessageCount(queueControl));
+
+      ClientProducer producer = session.createProducer(address);
+      producer.send(clientMessage);
+
+      Wait.assertEquals(1, () -> getMessageCount(queueControl));
+
+      assertTrue(server.getPagingManager().getPageStore(address).getAddressSize() > 2048);
+
+      Map<String, Object>[] messages = queueControl.listMessages("");
+      assertEquals(1, messages.length);
+      for (String key : messages[0].keySet()) {
+         Object value = messages[0].get(key);
+         System.err.println( key + " " + value);
+         assertTrue(value.toString().length() <= 150);
+
+         if (value instanceof byte[]) {
+            assertTrue(((byte[])value).length <= 150);
+         }
+      }
+
+      String all = queueControl.listMessagesAsJSON("");
+      assertTrue(all.length() < 1024);
+
+      String first = queueControl.getFirstMessageAsJSON();
+      assertTrue(first.length() < 1024);
+
+      CompositeData[] browseResult = queueControl.browse(1, 100);
+      for (CompositeData compositeData : browseResult) {
+         for (String key : compositeData.getCompositeType().keySet()) {
+            Object value = compositeData.get(key);
+            System.err.println("" + key + ", " + value);
+
+            if (value != null) {
+
+               if (key.equals("StringProperties")) {
+                  // these are very verbose composite data structures
+                  assertTrue(value.toString().length() + " truncated? " + key, value.toString().length() <= 2048);
+               } else {
+                  assertTrue(value.toString().length() + " truncated? " + key, value.toString().length() <= 512);
+               }
+
+               if (value instanceof byte[]) {
+                  assertTrue("truncated? " + key, ((byte[]) value).length <= 150);
+               }
+            }
+         }
+      }
+
+      session.deleteQueue(queue);
+   }
+
+   @Test
+   public void testTextMessageAttributeLimits() throws Exception {
+      SimpleString address = RandomUtil.randomSimpleString();
+      SimpleString queue = RandomUtil.randomSimpleString();
+
+      AddressSettings addressSettings = new AddressSettings().setManagementMessageAttributeSizeLimit(10);
+      server.getAddressSettingsRepository().addMatch(address.toString(), addressSettings);
+
+      session.createQueue(new QueueConfiguration(queue).setAddress(address).setDurable(durable));
+
+      final String twentyBytes = new String(new char[20]).replace("\0", "#");
+
+      ClientMessage clientMessage = createTextMessage(session, twentyBytes, true);
+      clientMessage.putStringProperty("x", twentyBytes);
+
+      QueueControl queueControl = createManagementControl(address, queue);
+      Assert.assertEquals(0, getMessageCount(queueControl));
+
+      ClientProducer producer = session.createProducer(address);
+      producer.send(clientMessage);
+
+      Wait.assertEquals(1, () -> getMessageCount(queueControl));
+
+      Map<String, Object>[] messages = queueControl.listMessages("");
+      assertEquals(1, messages.length);
+      assertTrue("truncated? ", ((String)messages[0].get("x")).contains("more"));
+
+      CompositeData[] browseResult = queueControl.browse(1, 100);
+      for (CompositeData compositeData : browseResult) {
+         for (String key : new String[] {"text", "PropertiesText", "StringProperties"}) {
+            assertTrue("truncated? : " + key, compositeData.get(key).toString().contains("more"));
+         }
+      }
+
+      session.deleteQueue(queue);
+   }
+
+   @Test
    public void testGetMessagesAdded() throws Exception {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
@@ -1082,7 +1198,7 @@ public class QueueControlTest extends ManagementTestBase {
       QueueControl queueControl = createManagementControl(address, queue);
 
       ClientProducer producer = session.createProducer(address);
-      producer.send(session.createMessage(durable));
+      producer.send(session.createMessage(durable).putBytesProperty("bytes", new byte[]{'%'}));
       producer.send(session.createMessage(durable));
 
       Wait.assertEquals(2, () -> queueControl.listMessages(null).length);