You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by mi...@apache.org on 2018/01/17 08:33:55 UTC

[1/6] activemq-artemis git commit: ARTEMIS-1586 Added String Pools unit tests

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 00bd989f9 -> 9fb8c3c47


ARTEMIS-1586 Added String Pools unit tests

- SimpleString::toSimpleString String pooling test
- ByteBufSimpleStringPool test
- StringSimpleStringPool test
- ByteBufStringValuePool test


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/a3c41818
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/a3c41818
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/a3c41818

Branch: refs/heads/master
Commit: a3c418183aa7f8e7fe92d4db551b9b15777aabb7
Parents: 98028cd
Author: Francesco Nigro <ni...@gmail.com>
Authored: Mon Jan 15 13:56:46 2018 +0100
Committer: Michael Pearce <mi...@me.com>
Committed: Wed Jan 17 09:33:41 2018 +0100

----------------------------------------------------------------------
 .../artemis/utils/TypedPropertiesTest.java      | 29 ++++++++++++
 .../artemis/tests/util/SimpleStringTest.java    | 49 ++++++++++++++++++++
 2 files changed, 78 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a3c41818/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java
index 38144c9..ea044ac 100644
--- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java
+++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java
@@ -18,6 +18,8 @@ package org.apache.activemq.artemis.utils;
 
 import java.util.Iterator;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -226,4 +228,31 @@ public class TypedPropertiesTest {
       props = new TypedProperties();
       key = RandomUtil.randomSimpleString();
    }
+
+   @Test
+   public void testByteBufStringValuePool() {
+      final int capacity = 8;
+      final int chars = Integer.toString(capacity).length();
+      final TypedProperties.StringValue.ByteBufStringValuePool pool = new TypedProperties.StringValue.ByteBufStringValuePool(capacity, chars);
+      final int bytes = new SimpleString(Integer.toString(capacity)).sizeof();
+      final ByteBuf bb = Unpooled.buffer(bytes, bytes);
+      for (int i = 0; i < capacity; i++) {
+         final SimpleString s = new SimpleString(Integer.toString(i));
+         bb.resetWriterIndex();
+         SimpleString.writeSimpleString(bb, s);
+         bb.resetReaderIndex();
+         final TypedProperties.StringValue expectedPooled = pool.getOrCreate(bb);
+         bb.resetReaderIndex();
+         Assert.assertSame(expectedPooled, pool.getOrCreate(bb));
+      }
+   }
+
+   @Test
+   public void testByteBufStringValuePoolTooLong() {
+      final SimpleString tooLong = new SimpleString("aa");
+      final ByteBuf bb = Unpooled.buffer(tooLong.sizeof(), tooLong.sizeof());
+      SimpleString.writeSimpleString(bb, tooLong);
+      final TypedProperties.StringValue.ByteBufStringValuePool pool = new TypedProperties.StringValue.ByteBufStringValuePool(1, tooLong.length() - 1);
+      Assert.assertNotSame(pool.getOrCreate(bb), pool.getOrCreate(bb.resetReaderIndex()));
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a3c41818/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/SimpleStringTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/SimpleStringTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/SimpleStringTest.java
index b367015..054b186 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/SimpleStringTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/SimpleStringTest.java
@@ -18,6 +18,8 @@ package org.apache.activemq.artemis.tests.util;
 
 import java.util.concurrent.CountDownLatch;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.utils.DataConstants;
 import org.apache.activemq.artemis.utils.RandomUtil;
@@ -394,4 +396,51 @@ public class SimpleStringTest extends Assert {
       }
    }
 
+   @Test
+   public void testToSimpleStringPoolStringArgument() throws Exception {
+      final String s = "pooled";
+      final SimpleString ss = SimpleString.toSimpleString(s);
+      final String s1 = ss.toString();
+      Assert.assertSame("SimpleString::toSimpleString is not pooling the given String", s, s1);
+   }
+
+   @Test
+   public void testByteBufSimpleStringPool() {
+      final int capacity = 8;
+      final int chars = Integer.toString(capacity).length();
+      final SimpleString.ByteBufSimpleStringPool pool = new SimpleString.ByteBufSimpleStringPool(capacity, chars);
+      final int bytes = new SimpleString(Integer.toString(capacity)).sizeof();
+      final ByteBuf bb = Unpooled.buffer(bytes, bytes);
+      for (int i = 0; i < capacity; i++) {
+         final SimpleString s = new SimpleString(Integer.toString(i));
+         bb.resetWriterIndex();
+         SimpleString.writeSimpleString(bb, s);
+         bb.resetReaderIndex();
+         final SimpleString expectedPooled = pool.getOrCreate(bb);
+         bb.resetReaderIndex();
+         Assert.assertSame(expectedPooled, pool.getOrCreate(bb));
+      }
+   }
+
+   @Test
+   public void testByteBufSimpleStringPoolTooLong() {
+      final SimpleString tooLong = new SimpleString("aa");
+      final ByteBuf bb = Unpooled.buffer(tooLong.sizeof(), tooLong.sizeof());
+      SimpleString.writeSimpleString(bb, tooLong);
+      final SimpleString.ByteBufSimpleStringPool pool = new SimpleString.ByteBufSimpleStringPool(1, tooLong.length() - 1);
+      Assert.assertNotSame(pool.getOrCreate(bb), pool.getOrCreate(bb.resetReaderIndex()));
+   }
+
+   @Test
+   public void testStringSimpleStringPool() throws Exception {
+      final int capacity = 8;
+      final SimpleString.StringSimpleStringPool pool = new SimpleString.StringSimpleStringPool(capacity);
+      for (int i = 0; i < capacity; i++) {
+         final String s = Integer.toString(i);
+         final SimpleString expectedPooled = pool.getOrCreate(s);
+         Assert.assertSame(expectedPooled, pool.getOrCreate(s));
+      }
+   }
+
+
 }


[6/6] activemq-artemis git commit: This closes #1757 ARTEMIS-1586 Reduce GC pressure due to String allocations on Core protocol

Posted by mi...@apache.org.
This closes #1757 ARTEMIS-1586 Reduce GC pressure due to String allocations on Core protocol


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/9fb8c3c4
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/9fb8c3c4
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/9fb8c3c4

Branch: refs/heads/master
Commit: 9fb8c3c470eb9a27fccf286314f5083082c4ba89
Parents: 00bd989 a3c4181
Author: Michael Pearce <mi...@me.com>
Authored: Wed Jan 17 09:33:42 2018 +0100
Committer: Michael Pearce <mi...@me.com>
Committed: Wed Jan 17 09:33:42 2018 +0100

----------------------------------------------------------------------
 .../activemq/artemis/api/core/SimpleString.java | 121 ++++++++++++--
 .../artemis/utils/AbstractByteBufPool.java      | 164 +++++++++++++++++++
 .../activemq/artemis/utils/AbstractPool.java    |  89 ++++++++++
 .../apache/activemq/artemis/utils/ByteUtil.java | 122 ++++++++++++++
 .../utils/collections/TypedProperties.java      | 110 ++++++++++++-
 .../artemis/utils/TypedPropertiesTest.java      |  29 ++++
 .../activemq/artemis/api/core/Message.java      |   6 +
 .../core/client/impl/ClientMessageImpl.java     |  23 ++-
 .../core/client/impl/ClientSessionImpl.java     |   5 +-
 .../artemis/core/message/impl/CoreMessage.java  |  91 ++++++----
 .../message/impl/CoreMessageObjectPools.java    |  55 +++++++
 .../core/protocol/ClientPacketDecoder.java      |   7 +-
 .../impl/ActiveMQClientProtocolManager.java     |   6 +-
 .../activemq/artemis/reader/MessageUtil.java    |  10 +-
 .../artemis/message/CoreMessageTest.java        |   2 +-
 .../artemis/jms/client/ActiveMQDestination.java |  20 ++-
 .../artemis/jms/client/ActiveMQMessage.java     |  55 ++++---
 .../artemis/jms/client/ActiveMQQueue.java       |   8 +-
 .../artemis/jms/client/ActiveMQSession.java     |   6 +-
 .../jms/client/ActiveMQStreamMessage.java       |   2 +-
 .../artemis/jms/client/ActiveMQTopic.java       |   8 +-
 .../protocol/amqp/broker/AMQPMessage.java       |  74 ++++++---
 .../amqp/broker/AMQPSessionCallback.java        | 105 ++++++------
 .../protocol/amqp/converter/AMQPConverter.java  |   5 +-
 .../amqp/converter/AMQPMessageSupport.java      |  49 +++---
 .../amqp/converter/AmqpCoreConverter.java       |  37 +++--
 .../proton/ProtonServerReceiverContext.java     |  16 +-
 .../amqp/proton/ProtonServerSenderContext.java  |  54 +++---
 .../JMSMappingOutboundTransformerTest.java      |   4 +-
 .../artemis/core/protocol/mqtt/MQTTSession.java |   8 +-
 .../core/protocol/mqtt/MQTTSessionCallback.java |   2 +-
 .../artemis/core/protocol/mqtt/MQTTUtil.java    |  19 ++-
 .../protocol/openwire/OpenWireConnection.java   |   2 +-
 .../openwire/OpenWireMessageConverter.java      |   8 +-
 .../core/protocol/openwire/OpenwireMessage.java |  11 ++
 .../core/protocol/openwire/amq/AMQConsumer.java |   2 +-
 .../core/protocol/openwire/amq/AMQSession.java  |  11 +-
 .../core/protocol/stomp/StompSession.java       |   2 +-
 .../ra/inflow/ActiveMQMessageHandler.java       |   2 +-
 .../core/protocol/ServerPacketDecoder.java      |   8 +-
 .../protocol/core/impl/CoreProtocolManager.java |   2 +-
 .../protocol/core/impl/CoreSessionCallback.java |  11 +-
 ...ctiveMQServerSideProtocolManagerFactory.java |   4 +-
 .../core/server/impl/ServerConsumerImpl.java    |   2 +-
 .../spi/core/protocol/MessageConverter.java     |   3 +-
 .../spi/core/protocol/SessionCallback.java      |   2 +-
 .../impl/ScheduledDeliveryHandlerTest.java      |  11 ++
 .../artemis/tests/util/SimpleStringTest.java    |  49 ++++++
 .../integration/client/AcknowledgeTest.java     |  11 ++
 .../integration/client/HangConsumerTest.java    |   2 +-
 .../persistence/XmlImportExportTest.java        |   2 +-
 51 files changed, 1157 insertions(+), 300 deletions(-)
----------------------------------------------------------------------



[5/6] activemq-artemis git commit: ARTEMIS-1586 Reduce GC pressure due to String allocations on Core protocol

Posted by mi...@apache.org.
ARTEMIS-1586 Reduce GC pressure due to String allocations on Core protocol

The commit contains:
- a general purpose interner implementation
- StringValue/SimpleString internrs specializations
- TypedProperties keys/values string interning for SessionSendMessage decoding


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/8d776edd
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/8d776edd
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/8d776edd

Branch: refs/heads/master
Commit: 8d776eddfcc12bfc73771c04e376583c9fa221e1
Parents: 00bd989
Author: Francesco Nigro <ni...@gmail.com>
Authored: Thu Jan 4 15:22:05 2018 +0100
Committer: Michael Pearce <mi...@me.com>
Committed: Wed Jan 17 09:33:41 2018 +0100

----------------------------------------------------------------------
 .../activemq/artemis/api/core/SimpleString.java | 138 +++++++++++++++-
 .../artemis/utils/AbstractInterner.java         | 157 +++++++++++++++++++
 .../utils/collections/TypedProperties.java      |  60 ++++++-
 .../artemis/core/message/impl/CoreMessage.java  |  36 ++++-
 .../core/protocol/ClientPacketDecoder.java      |   4 +
 .../impl/ActiveMQClientProtocolManager.java     |   4 +-
 .../core/protocol/ServerPacketDecoder.java      |  29 +++-
 .../protocol/core/impl/CoreProtocolManager.java |   2 +-
 ...ctiveMQServerSideProtocolManagerFactory.java |   4 +-
 9 files changed, 404 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8d776edd/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
index 79909c7..e24e245 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
@@ -21,6 +21,8 @@ import java.util.ArrayList;
 import java.util.List;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.util.internal.PlatformDependent;
+import org.apache.activemq.artemis.utils.AbstractInterner;
 import org.apache.activemq.artemis.utils.DataConstants;
 
 /**
@@ -31,6 +33,129 @@ import org.apache.activemq.artemis.utils.DataConstants;
  */
 public final class SimpleString implements CharSequence, Serializable, Comparable<SimpleString> {
 
+   public static final class Interner extends AbstractInterner<SimpleString> {
+
+      private final int maxLength;
+
+      public Interner(final int capacity, final int maxCharsLength) {
+         super(capacity);
+         this.maxLength = maxCharsLength;
+      }
+
+      @Override
+      protected boolean isEqual(final SimpleString entry, final ByteBuf byteBuf, final int offset, final int length) {
+         return SimpleString.isEqual(entry, byteBuf, offset, length);
+      }
+
+      @Override
+      protected boolean canIntern(final ByteBuf byteBuf, final int length) {
+         assert length % 2 == 0 : "length must be a multiple of 2";
+         final int expectedStringLength = length >> 1;
+         return expectedStringLength <= maxLength;
+      }
+
+      @Override
+      protected SimpleString create(final ByteBuf byteBuf, final int length) {
+         return readSimpleString(byteBuf, length);
+      }
+   }
+
+   /**
+    * Returns {@code true} if  the {@link SimpleString} encoded content into {@code bytes} is equals to {@code s},
+    * {@code false} otherwise.
+    * <p>
+    * It assumes that the {@code bytes} content is read using {@link SimpleString#readSimpleString(ByteBuf, int)} ie starting right after the
+    * length field.
+    */
+   public static boolean isEqual(final SimpleString s, final ByteBuf bytes, final int offset, final int length) {
+      if (s == null) {
+         return false;
+      }
+      final byte[] chars = s.getData();
+      if (chars.length != length)
+         return false;
+      if (PlatformDependent.isUnaligned() && PlatformDependent.hasUnsafe()) {
+         if ((offset + length) > bytes.writerIndex()) {
+            throw new IndexOutOfBoundsException();
+         }
+         if (bytes.hasArray()) {
+            return batchOnHeapIsEqual(chars, bytes.array(), bytes.arrayOffset() + offset, length);
+         } else if (bytes.hasMemoryAddress()) {
+            return batchOffHeapIsEqual(chars, bytes.memoryAddress(), offset, length);
+         }
+      }
+      return byteBufIsEqual(chars, bytes, offset, length);
+   }
+
+   private static boolean byteBufIsEqual(final byte[] chars, final ByteBuf bytes, final int offset, final int length) {
+      for (int i = 0; i < length; i++)
+         if (chars[i] != bytes.getByte(offset + i))
+            return false;
+      return true;
+   }
+
+   private static boolean batchOnHeapIsEqual(final byte[] chars,
+                                             final byte[] array,
+                                             final int arrayOffset,
+                                             final int length) {
+      final int longCount = length >>> 3;
+      final int bytesCount = length & 7;
+      int bytesIndex = arrayOffset;
+      int charsIndex = 0;
+      for (int i = 0; i < longCount; i++) {
+         final long charsLong = PlatformDependent.getLong(chars, charsIndex);
+         final long bytesLong = PlatformDependent.getLong(array, bytesIndex);
+         if (charsLong != bytesLong) {
+            return false;
+
+         }
+         bytesIndex += 8;
+         charsIndex += 8;
+      }
+      for (int i = 0; i < bytesCount; i++) {
+         final byte charsByte = PlatformDependent.getByte(chars, charsIndex);
+         final byte bytesByte = PlatformDependent.getByte(array, bytesIndex);
+         if (charsByte != bytesByte) {
+            return false;
+
+         }
+         bytesIndex++;
+         charsIndex++;
+      }
+      return true;
+   }
+
+   private static boolean batchOffHeapIsEqual(final byte[] chars,
+                                              final long address,
+                                              final int offset,
+                                              final int length) {
+      final int longCount = length >>> 3;
+      final int bytesCount = length & 7;
+      long bytesAddress = address + offset;
+      int charsIndex = 0;
+      for (int i = 0; i < longCount; i++) {
+         final long charsLong = PlatformDependent.getLong(chars, charsIndex);
+         final long bytesLong = PlatformDependent.getLong(bytesAddress);
+         if (charsLong != bytesLong) {
+            return false;
+
+         }
+         bytesAddress += 8;
+         charsIndex += 8;
+      }
+      for (int i = 0; i < bytesCount; i++) {
+         final byte charsByte = PlatformDependent.getByte(chars, charsIndex);
+         final byte bytesByte = PlatformDependent.getByte(bytesAddress);
+         if (charsByte != bytesByte) {
+            return false;
+
+         }
+         bytesAddress++;
+         charsIndex++;
+      }
+      return true;
+   }
+
    private static final long serialVersionUID = 4204223851422244307L;
 
    // Attributes
@@ -134,7 +259,6 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl
       return subSeq(start, end);
    }
 
-
    public static SimpleString readNullableSimpleString(ByteBuf buffer) {
       int b = buffer.readByte();
       if (b == DataConstants.NULL) {
@@ -143,13 +267,13 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl
       return readSimpleString(buffer);
    }
 
-
    public static SimpleString readSimpleString(ByteBuf buffer) {
       int len = buffer.readInt();
-      if (len > buffer.readableBytes()) {
-         throw new IndexOutOfBoundsException();
-      }
-      byte[] data = new byte[len];
+      return readSimpleString(buffer, len);
+   }
+
+   public static SimpleString readSimpleString(final ByteBuf buffer, final int length) {
+      byte[] data = new byte[length];
       buffer.readBytes(data);
       return new SimpleString(data);
    }
@@ -169,8 +293,6 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl
       buffer.writeBytes(data);
    }
 
-
-
    public SimpleString subSeq(final int start, final int end) {
       int len = data.length >> 1;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8d776edd/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractInterner.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractInterner.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractInterner.java
new file mode 100644
index 0000000..7e1fe40
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractInterner.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.internal.MathUtil;
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * Thread-safe {@code <T>} interner.
+ * <p>
+ * Differently from {@link String#intern()} it contains a fixed amount of entries and
+ * when used by concurrent threads it doesn't ensure the uniqueness of the entries ie
+ * the same entry could be allocated multiple times by concurrent calls.
+ */
+public abstract class AbstractInterner<T> {
+
+   private final T[] entries;
+   private final int mask;
+   private final int shift;
+
+   public AbstractInterner(final int capacity) {
+      entries = (T[]) new Object[MathUtil.findNextPositivePowerOfTwo(capacity)];
+      mask = entries.length - 1;
+      //log2 of entries.length
+      shift = 31 - Integer.numberOfLeadingZeros(entries.length);
+   }
+
+   /**
+    * Batch hash code implementation that works at its best if {@code bytes}
+    * contains a {@link org.apache.activemq.artemis.api.core.SimpleString} encoded.
+    */
+   private static int hashCode(final ByteBuf bytes, final int offset, final int length) {
+      if (PlatformDependent.isUnaligned() && PlatformDependent.hasUnsafe()) {
+         //if the platform allows it, the hash code could be computed without bounds checking
+         if (bytes.hasArray()) {
+            return onHeapHashCode(bytes.array(), bytes.arrayOffset() + offset, length);
+         } else if (bytes.hasMemoryAddress()) {
+            return offHeapHashCode(bytes.memoryAddress(), offset, length);
+         }
+      }
+      return byteBufHashCode(bytes, offset, length);
+   }
+
+   private static int onHeapHashCode(final byte[] bytes, final int offset, final int length) {
+      final int intCount = length >>> 1;
+      final int byteCount = length & 1;
+      int hashCode = 1;
+      int arrayIndex = offset;
+      for (int i = 0; i < intCount; i++) {
+         hashCode = 31 * hashCode + PlatformDependent.getShort(bytes, arrayIndex);
+         arrayIndex += 2;
+      }
+      for (int i = 0; i < byteCount; i++) {
+         hashCode = 31 * hashCode + PlatformDependent.getByte(bytes, arrayIndex++);
+      }
+      return hashCode;
+   }
+
+   private static int offHeapHashCode(final long address, final int offset, final int length) {
+      final int intCount = length >>> 1;
+      final int byteCount = length & 1;
+      int hashCode = 1;
+      int arrayIndex = offset;
+      for (int i = 0; i < intCount; i++) {
+         hashCode = 31 * hashCode + PlatformDependent.getShort(address + arrayIndex);
+         arrayIndex += 2;
+      }
+      for (int i = 0; i < byteCount; i++) {
+         hashCode = 31 * hashCode + PlatformDependent.getByte(address + arrayIndex++);
+      }
+      return hashCode;
+   }
+
+   private static int byteBufHashCode(final ByteBuf byteBuf, final int offset, final int length) {
+      final int intCount = length >>> 1;
+      final int byteCount = length & 1;
+      int hashCode = 1;
+      int arrayIndex = offset;
+      for (int i = 0; i < intCount; i++) {
+         final short shortLE = byteBuf.getShortLE(arrayIndex);
+         final short nativeShort = PlatformDependent.BIG_ENDIAN_NATIVE_ORDER ? Short.reverseBytes(shortLE) : shortLE;
+         hashCode = 31 * hashCode + nativeShort;
+         arrayIndex += 2;
+      }
+      for (int i = 0; i < byteCount; i++) {
+         hashCode = 31 * hashCode + byteBuf.getByte(arrayIndex++);
+      }
+      return hashCode;
+   }
+
+   /**
+    * Returns {@code true} if {@code length}'s {@code byteBuf} content from {@link ByteBuf#readerIndex()} can be interned,
+    * {@code false} otherwise.
+    */
+   protected abstract boolean canIntern(ByteBuf byteBuf, int length);
+
+   /**
+    * Create a new entry.
+    */
+   protected abstract T create(ByteBuf byteBuf, int length);
+
+   /**
+    * Returns {@code true} if the {@code entry} content is the same of {@code byteBuf} at the specified {@code offset}
+    * and {@code length} {@code false} otherwise.
+    */
+   protected abstract boolean isEqual(T entry, ByteBuf byteBuf, int offset, int length);
+
+   /**
+    * Returns and interned entry if possible, a new one otherwise.
+    * <p>
+    * The {@code byteBuf}'s {@link ByteBuf#readerIndex()} is incremented by {@code length} after it.
+    */
+   public final T intern(final ByteBuf byteBuf, final int length) {
+      if (!canIntern(byteBuf, length)) {
+         return create(byteBuf, length);
+      } else {
+         if (!byteBuf.isReadable(length)) {
+            throw new IndexOutOfBoundsException();
+         }
+         final int bytesOffset = byteBuf.readerIndex();
+         final int hashCode = hashCode(byteBuf, bytesOffset, length);
+         //fast % operation with power of 2 entries.length
+         final int firstIndex = hashCode & mask;
+         final T firstEntry = entries[firstIndex];
+         if (isEqual(firstEntry, byteBuf, bytesOffset, length)) {
+            byteBuf.skipBytes(length);
+            return firstEntry;
+         }
+         final int secondIndex = (hashCode >> shift) & mask;
+         final T secondEntry = entries[secondIndex];
+         if (isEqual(secondEntry, byteBuf, bytesOffset, length)) {
+            byteBuf.skipBytes(length);
+            return secondEntry;
+         }
+         final T internedEntry = create(byteBuf, length);
+         final int entryIndex = firstEntry == null ? firstIndex : secondIndex;
+         entries[entryIndex] = internedEntry;
+         return internedEntry;
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8d776edd/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
index b17156e..a3e4876 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
@@ -28,6 +28,7 @@ import io.netty.buffer.ByteBuf;
 import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.logs.ActiveMQUtilBundle;
+import org.apache.activemq.artemis.utils.AbstractInterner;
 import org.apache.activemq.artemis.utils.ByteUtil;
 import org.apache.activemq.artemis.utils.DataConstants;
 
@@ -94,6 +95,7 @@ public class TypedProperties {
 
    public void putByteProperty(final SimpleString key, final byte value) {
       checkCreateProperties();
+      checkCreateProperties();
       doPutValue(key, ByteValue.valueOf(value));
    }
 
@@ -329,7 +331,9 @@ public class TypedProperties {
       }
    }
 
-   public synchronized void decode(final ByteBuf buffer) {
+   public synchronized void decode(final ByteBuf buffer,
+                                   final SimpleString.Interner keyInterner,
+                                   final StringValue.Interner valueInterner) {
       byte b = buffer.readByte();
 
       if (b == DataConstants.NULL) {
@@ -342,10 +346,15 @@ public class TypedProperties {
          size = 0;
 
          for (int i = 0; i < numHeaders; i++) {
+            final SimpleString key;
             int len = buffer.readInt();
-            byte[] data = new byte[len];
-            buffer.readBytes(data);
-            SimpleString key = new SimpleString(data);
+            if (keyInterner != null) {
+               key = keyInterner.intern(buffer, len);
+            } else {
+               byte[] data = new byte[len];
+               buffer.readBytes(data);
+               key = new SimpleString(data);
+            }
 
             byte type = buffer.readByte();
 
@@ -403,7 +412,12 @@ public class TypedProperties {
                   break;
                }
                case STRING: {
-                  val = new StringValue(buffer);
+                  if (valueInterner != null) {
+                     final int length = buffer.readInt();
+                     val = valueInterner.intern(buffer, length);
+                  } else {
+                     val = new StringValue(buffer);
+                  }
                   doPutValue(key, val);
                   break;
                }
@@ -415,6 +429,10 @@ public class TypedProperties {
       }
    }
 
+   public synchronized void decode(final ByteBuf buffer) {
+      decode(buffer, null, null);
+   }
+
    public synchronized void encode(final ByteBuf buffer) {
       if (properties == null) {
          buffer.writeByte(DataConstants.NULL);
@@ -881,7 +899,37 @@ public class TypedProperties {
       }
    }
 
-   private static final class StringValue extends PropertyValue {
+   public static final class StringValue extends PropertyValue {
+
+      public static final class Interner extends AbstractInterner<StringValue> {
+
+         private final int maxLength;
+
+         public Interner(final int capacity, final int maxCharsLength) {
+            super(capacity);
+            this.maxLength = maxCharsLength;
+         }
+
+         @Override
+         protected boolean isEqual(final StringValue entry, final ByteBuf byteBuf, final int offset, final int length) {
+            if (entry == null) {
+               return false;
+            }
+            return SimpleString.isEqual(entry.val, byteBuf, offset, length);
+         }
+
+         @Override
+         protected boolean canIntern(final ByteBuf byteBuf, final int length) {
+            assert length % 2 == 0 : "length must be a multiple of 2";
+            final int expectedStringLength = length >> 1;
+            return expectedStringLength <= maxLength;
+         }
+
+         @Override
+         protected StringValue create(final ByteBuf byteBuf, final int length) {
+            return new StringValue(SimpleString.readSimpleString(byteBuf, length));
+         }
+      }
 
       final SimpleString val;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8d776edd/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index b0656b6..4ebf97e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import java.util.LinkedList;
 import java.util.Set;
 
+import io.netty.buffer.ByteBuf;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -42,8 +43,6 @@ import org.apache.activemq.artemis.utils.UUID;
 import org.apache.activemq.artemis.utils.collections.TypedProperties;
 import org.jboss.logging.Logger;
 
-import io.netty.buffer.ByteBuf;
-
 /** Note: you shouldn't change properties using multi-threads. Change your properties before you can send it to multiple
  *  consumers */
 public class CoreMessage extends RefCountMessage implements ICoreMessage {
@@ -94,7 +93,18 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
 
    protected volatile TypedProperties properties;
 
+   private final SimpleString.Interner keysInterner;
+   private final TypedProperties.StringValue.Interner valuesInterner;
+
+   public CoreMessage(final SimpleString.Interner keysInterner,
+                      final TypedProperties.StringValue.Interner valuesInterner) {
+      this.keysInterner = keysInterner;
+      this.valuesInterner = valuesInterner;
+   }
+
    public CoreMessage() {
+      this.keysInterner = null;
+      this.valuesInterner = null;
    }
 
    /** On core there's no delivery annotation */
@@ -318,6 +328,8 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
    public CoreMessage(long id, int bufferSize) {
       this.initBuffer(bufferSize);
       this.setMessageID(id);
+      this.keysInterner = null;
+      this.valuesInterner = null;
    }
 
    protected CoreMessage(CoreMessage other, TypedProperties copyProperties) {
@@ -331,6 +343,8 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
       this.timestamp = other.timestamp;
       this.priority = other.priority;
       this.userID = other.userID;
+      this.keysInterner = other.keysInterner;
+      this.valuesInterner = other.valuesInterner;
       if (copyProperties != null) {
          this.properties = new TypedProperties(copyProperties);
       }
@@ -464,7 +478,8 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
       if (properties == null) {
          TypedProperties properties = new TypedProperties();
          if (buffer != null && propertiesLocation >= 0) {
-            properties.decode(buffer.duplicate().readerIndex(propertiesLocation));
+            final ByteBuf byteBuf = buffer.duplicate().readerIndex(propertiesLocation);
+            properties.decode(byteBuf, keysInterner, valuesInterner);
          }
          this.properties = properties;
       }
@@ -528,8 +543,17 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
    private void decodeHeadersAndProperties(final ByteBuf buffer, boolean lazyProperties) {
       messageIDPosition = buffer.readerIndex();
       messageID = buffer.readLong();
-
-      address = SimpleString.readNullableSimpleString(buffer);
+      int b = buffer.readByte();
+      if (b != DataConstants.NULL) {
+         final int length = buffer.readInt();
+         if (keysInterner != null) {
+            address = keysInterner.intern(buffer, length);
+         } else {
+            address = SimpleString.readSimpleString(buffer, length);
+         }
+      } else {
+         address = null;
+      }
       if (buffer.readByte() == DataConstants.NOT_NULL) {
          byte[] bytes = new byte[16];
          buffer.readBytes(bytes);
@@ -547,7 +571,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
          propertiesLocation = buffer.readerIndex();
       } else {
          properties = new TypedProperties();
-         properties.decode(buffer);
+         properties.decode(buffer, keysInterner, valuesInterner);
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8d776edd/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java
index 1022030..787e499 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java
@@ -34,6 +34,10 @@ public class ClientPacketDecoder extends PacketDecoder {
    private static final long serialVersionUID = 6952614096979334582L;
    public static final ClientPacketDecoder INSTANCE = new ClientPacketDecoder();
 
+   protected ClientPacketDecoder() {
+
+   }
+
    @Override
    public Packet decode(final ActiveMQBuffer in, CoreRemotingConnection connection) {
       final byte packetType = in.readByte();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8d776edd/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
index 93432b8..f0005ff 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
@@ -409,7 +409,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager {
                                      List<Interceptor> incomingInterceptors,
                                      List<Interceptor> outgoingInterceptors,
                                      TopologyResponseHandler topologyResponseHandler) {
-      this.connection = new RemotingConnectionImpl(getPacketDecoder(), transportConnection, callTimeout, callFailoverTimeout, incomingInterceptors, outgoingInterceptors);
+      this.connection = new RemotingConnectionImpl(createPacketDecoder(), transportConnection, callTimeout, callFailoverTimeout, incomingInterceptors, outgoingInterceptors);
 
       this.topologyResponseHandler = topologyResponseHandler;
 
@@ -510,7 +510,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager {
       }
    }
 
-   protected PacketDecoder getPacketDecoder() {
+   protected PacketDecoder createPacketDecoder() {
       return ClientPacketDecoder.INSTANCE;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8d776edd/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
index 0584476..2276fdb 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.core.protocol;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
 import org.apache.activemq.artemis.core.protocol.core.Packet;
@@ -53,6 +54,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReq
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X;
+import org.apache.activemq.artemis.utils.collections.TypedProperties;
 
 import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST;
 import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST_RESPONSE;
@@ -83,16 +85,34 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SES
 
 public class ServerPacketDecoder extends ClientPacketDecoder {
 
+   private static final int UUID_LENGTH = 36;
+   private static final int DEFAULT_INTERNER_CAPACITY = 32;
    private static final long serialVersionUID = 3348673114388400766L;
-   public static final ServerPacketDecoder INSTANCE = new ServerPacketDecoder();
+   private SimpleString.Interner keysInterner;
+   private TypedProperties.StringValue.Interner valuesInterner;
 
-   private static SessionSendMessage decodeSessionSendMessage(final ActiveMQBuffer in, CoreRemotingConnection connection) {
+   public ServerPacketDecoder() {
+      this.keysInterner = null;
+      this.valuesInterner = null;
+   }
+
+   private void initializeInternersIfNeeded() {
+      if (this.keysInterner == null) {
+         this.keysInterner = new SimpleString.Interner(DEFAULT_INTERNER_CAPACITY, UUID_LENGTH);
+      }
+      if (this.valuesInterner == null) {
+         this.valuesInterner = new TypedProperties.StringValue.Interner(DEFAULT_INTERNER_CAPACITY, UUID_LENGTH);
+      }
+   }
+
+   private SessionSendMessage decodeSessionSendMessage(final ActiveMQBuffer in, CoreRemotingConnection connection) {
       final SessionSendMessage sendMessage;
 
+      initializeInternersIfNeeded();
       if (connection.isVersionBeforeAddressChange()) {
-         sendMessage = new SessionSendMessage_1X(new CoreMessage());
+         sendMessage = new SessionSendMessage_1X(new CoreMessage(this.keysInterner, this.valuesInterner));
       } else {
-         sendMessage = new SessionSendMessage(new CoreMessage());
+         sendMessage = new SessionSendMessage(new CoreMessage(this.keysInterner, this.valuesInterner));
       }
 
       sendMessage.decode(in);
@@ -259,5 +279,4 @@ public class ServerPacketDecoder extends ClientPacketDecoder {
 
       return packet;
    }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8d776edd/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
index c9262fa..af9e131 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
@@ -116,7 +116,7 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor> {
 
       Executor connectionExecutor = server.getExecutorFactory().getExecutor();
 
-      final CoreRemotingConnection rc = new RemotingConnectionImpl(ServerPacketDecoder.INSTANCE, connection, incomingInterceptors, outgoingInterceptors, config.isAsyncConnectionExecutionEnabled() ? connectionExecutor : null, server.getNodeID());
+      final CoreRemotingConnection rc = new RemotingConnectionImpl(new ServerPacketDecoder(), connection, incomingInterceptors, outgoingInterceptors, config.isAsyncConnectionExecutionEnabled() ? connectionExecutor : null, server.getNodeID());
 
       Channel channel1 = rc.getChannel(CHANNEL_ID.SESSION.id, -1);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8d776edd/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ActiveMQServerSideProtocolManagerFactory.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ActiveMQServerSideProtocolManagerFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ActiveMQServerSideProtocolManagerFactory.java
index 85ad3a3..209f68f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ActiveMQServerSideProtocolManagerFactory.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ActiveMQServerSideProtocolManagerFactory.java
@@ -65,8 +65,8 @@ public class ActiveMQServerSideProtocolManagerFactory implements ClientProtocolM
    class ActiveMQReplicationProtocolManager extends ActiveMQClientProtocolManager {
 
       @Override
-      protected PacketDecoder getPacketDecoder() {
-         return ServerPacketDecoder.INSTANCE;
+      protected PacketDecoder createPacketDecoder() {
+         return new ServerPacketDecoder();
       }
    }
 }


[2/6] activemq-artemis git commit: ARTEMIS-1586 Refactor to make more generic

Posted by mi...@apache.org.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 75faa97..57865b7 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -154,7 +154,7 @@ public class AMQConsumer {
       }
       addressInfo.setInternal(internalAddress);
       if (isDurable) {
-         queueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForSubscription(true, clientID, subscriptionName));
+         queueName = org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForSubscription(true, clientID, subscriptionName);
          QueueQueryResult result = session.getCoreSession().executeQueueQuery(queueName);
          if (result.isExists()) {
             // Already exists

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 64d1353..bca7eae 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -27,6 +27,7 @@ import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
 import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
@@ -82,6 +83,8 @@ public class AMQSession implements SessionCallback {
 
    private final OpenWireProtocolManager protocolManager;
 
+   private final CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools();
+
    public AMQSession(ConnectionInfo connInfo,
                      SessionInfo sessInfo,
                      ActiveMQServer server,
@@ -295,7 +298,7 @@ public class AMQSession implements SessionCallback {
    }
 
    @Override
-   public void disconnect(ServerConsumer consumerId, String queueName) {
+   public void disconnect(ServerConsumer consumerId, SimpleString queueName) {
       // TODO Auto-generated method stub
 
    }
@@ -315,7 +318,7 @@ public class AMQSession implements SessionCallback {
          actualDestinations = new ActiveMQDestination[]{destination};
       }
 
-      org.apache.activemq.artemis.api.core.Message originalCoreMsg = getConverter().inbound(messageSend);
+      org.apache.activemq.artemis.api.core.Message originalCoreMsg = getConverter().inbound(messageSend, coreMessageObjectPools);
 
       originalCoreMsg.putStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString(), this.connection.getState().getInfo().getClientId());
 
@@ -338,12 +341,12 @@ public class AMQSession implements SessionCallback {
 
       for (int i = 0; i < actualDestinations.length; i++) {
          ActiveMQDestination dest = actualDestinations[i];
-         SimpleString address = new SimpleString(dest.getPhysicalName());
+         SimpleString address = SimpleString.toSimpleString(dest.getPhysicalName(), coreMessageObjectPools.getAddressStringSimpleStringPool());
          org.apache.activemq.artemis.api.core.Message coreMsg = originalCoreMsg.copy();
          coreMsg.setAddress(address);
 
          if (actualDestinations[i].isQueue()) {
-            checkAutoCreateQueue(new SimpleString(actualDestinations[i].getPhysicalName()), actualDestinations[i].isTemporary());
+            checkAutoCreateQueue(SimpleString.toSimpleString(actualDestinations[i].getPhysicalName(), coreMessageObjectPools.getAddressStringSimpleStringPool()), actualDestinations[i].isTemporary());
             coreMsg.setRoutingType(RoutingType.ANYCAST);
          } else {
             coreMsg.setRoutingType(RoutingType.MULTICAST);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index 18e0b10..0a12b47 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -239,7 +239,7 @@ public class StompSession implements SessionCallback {
    }
 
    @Override
-   public void disconnect(ServerConsumer consumerId, String queueName) {
+   public void disconnect(ServerConsumer consumerId, SimpleString queueName) {
       StompSubscription stompSubscription = subscriptions.remove(consumerId.getID());
       if (stompSubscription != null) {
          StompFrame frame = connection.getFrameHandler().createStompFrame(Stomp.Responses.ERROR);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java
----------------------------------------------------------------------
diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java
index 9133cdf..f343ec9 100644
--- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java
+++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java
@@ -111,7 +111,7 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList
       // Create the message consumer
       SimpleString selectorString = selector == null || selector.trim().equals("") ? null : new SimpleString(selector);
       if (activation.isTopic() && spec.isSubscriptionDurable()) {
-         SimpleString queueName = new SimpleString(ActiveMQDestination.createQueueNameForSubscription(true, spec.getClientID(), spec.getSubscriptionName()));
+         SimpleString queueName = ActiveMQDestination.createQueueNameForSubscription(true, spec.getClientID(), spec.getSubscriptionName());
 
          QueueQuery subResponse = session.queueQuery(queueName);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
index 2276fdb..d38f45f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
@@ -17,7 +17,6 @@
 package org.apache.activemq.artemis.core.protocol;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
 import org.apache.activemq.artemis.core.protocol.core.Packet;
@@ -54,7 +53,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReq
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X;
-import org.apache.activemq.artemis.utils.collections.TypedProperties;
 
 import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST;
 import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST_RESPONSE;
@@ -85,34 +83,15 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SES
 
 public class ServerPacketDecoder extends ClientPacketDecoder {
 
-   private static final int UUID_LENGTH = 36;
-   private static final int DEFAULT_INTERNER_CAPACITY = 32;
    private static final long serialVersionUID = 3348673114388400766L;
-   private SimpleString.Interner keysInterner;
-   private TypedProperties.StringValue.Interner valuesInterner;
-
-   public ServerPacketDecoder() {
-      this.keysInterner = null;
-      this.valuesInterner = null;
-   }
-
-   private void initializeInternersIfNeeded() {
-      if (this.keysInterner == null) {
-         this.keysInterner = new SimpleString.Interner(DEFAULT_INTERNER_CAPACITY, UUID_LENGTH);
-      }
-      if (this.valuesInterner == null) {
-         this.valuesInterner = new TypedProperties.StringValue.Interner(DEFAULT_INTERNER_CAPACITY, UUID_LENGTH);
-      }
-   }
 
    private SessionSendMessage decodeSessionSendMessage(final ActiveMQBuffer in, CoreRemotingConnection connection) {
       final SessionSendMessage sendMessage;
 
-      initializeInternersIfNeeded();
       if (connection.isVersionBeforeAddressChange()) {
-         sendMessage = new SessionSendMessage_1X(new CoreMessage(this.keysInterner, this.valuesInterner));
+         sendMessage = new SessionSendMessage_1X(new CoreMessage(this.coreMessageObjectPools));
       } else {
-         sendMessage = new SessionSendMessage(new CoreMessage(this.keysInterner, this.valuesInterner));
+         sendMessage = new SessionSendMessage(new CoreMessage(this.coreMessageObjectPools));
       }
 
       sendMessage.decode(in);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
index 8b281eb..f53d028 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl;
 
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.protocol.core.Channel;
 import org.apache.activemq.artemis.core.protocol.core.Packet;
 import org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler;
@@ -48,6 +49,8 @@ public final class CoreSessionCallback implements SessionCallback {
 
    private ServerSessionPacketHandler handler;
 
+   private CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools();
+
    public CoreSessionCallback(String name,
                               ProtocolManager protocolManager,
                               Channel channel,
@@ -115,9 +118,9 @@ public final class CoreSessionCallback implements SessionCallback {
 
       Packet packet;
       if (channel.getConnection().isVersionBeforeAddressChange()) {
-         packet = new SessionReceiveMessage_1X(consumer.getID(), message.toCore(), deliveryCount);
+         packet = new SessionReceiveMessage_1X(consumer.getID(), message.toCore(coreMessageObjectPools), deliveryCount);
       } else {
-         packet = new SessionReceiveMessage(consumer.getID(), message.toCore(), deliveryCount);
+         packet = new SessionReceiveMessage(consumer.getID(), message.toCore(coreMessageObjectPools), deliveryCount);
       }
 
       int size = 0;
@@ -159,11 +162,11 @@ public final class CoreSessionCallback implements SessionCallback {
    }
 
    @Override
-   public void disconnect(ServerConsumer consumerId, String queueName) {
+   public void disconnect(ServerConsumer consumerId, SimpleString queueName) {
       if (channel.supports(PacketImpl.DISCONNECT_CONSUMER)) {
          channel.send(new DisconnectConsumerMessage(consumerId.getID()));
       } else {
-         ActiveMQServerLogger.LOGGER.warnDisconnectOldClient(queueName);
+         ActiveMQServerLogger.LOGGER.warnDisconnectOldClient(queueName.toString());
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 5dc1b93..15b1465 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -1045,7 +1045,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
 
    @Override
    public void disconnect() {
-      callback.disconnect(this, getQueue().getName().toString());
+      callback.disconnect(this, getQueue().getName());
    }
 
    public float getRate() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java
index a440e31..2c81343 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java
@@ -18,10 +18,11 @@ package org.apache.activemq.artemis.spi.core.protocol;
 
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 
 public interface MessageConverter<ProtocolMessage extends Message> {
 
-   ICoreMessage toCore(ProtocolMessage pureMessage) throws Exception;
+   ICoreMessage toCore(ProtocolMessage pureMessage, CoreMessageObjectPools coreMessageObjectPools) throws Exception;
 
    ProtocolMessage fromCore(ICoreMessage coreMessage) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
index ae1612f..c4a2dbe 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
@@ -81,7 +81,7 @@ public interface SessionCallback {
 
    void closed();
 
-   void disconnect(ServerConsumer consumerId, String queueName);
+   void disconnect(ServerConsumer consumerId, SimpleString queueName);
 
    boolean isWritable(ReadyListener callback, Object protocolContext);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 2707190..5cfac12 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -39,6 +39,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.persistence.Persister;
@@ -334,6 +335,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
 
       @Override
       public CoreMessage toCore() {
+         return toCore(null);
+      }
+
+      @Override
+      public CoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) {
          return null;
       }
 
@@ -591,6 +597,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
+      public Message putStringProperty(SimpleString key, String value) {
+         return null;
+      }
+
+      @Override
       public Message putStringProperty(String key, String value) {
          return null;
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
index d7c9855..078c397 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
@@ -37,6 +37,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
 import org.apache.activemq.artemis.api.core.client.MessageHandler;
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQConsumerContext;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -386,6 +387,11 @@ public class AcknowledgeTest extends ActiveMQTestBase {
 
       @Override
       public ICoreMessage toCore() {
+         return toCore(null);
+      }
+
+      @Override
+      public ICoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) {
          return null;
       }
 
@@ -648,6 +654,11 @@ public class AcknowledgeTest extends ActiveMQTestBase {
       }
 
       @Override
+      public Message putStringProperty(SimpleString key, String value) {
+         return null;
+      }
+
+      @Override
       public Message putStringProperty(String key, String value) {
          return null;
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
index 2f25480..dc57a12 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
@@ -585,7 +585,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
       }
 
       @Override
-      public void disconnect(ServerConsumer consumerId, String queueName) {
+      public void disconnect(ServerConsumer consumerId, SimpleString queueName) {
          //To change body of implemented methods use File | Settings | File Templates.
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java
index 790ed82..aaf29b0 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java
@@ -128,7 +128,7 @@ public class XmlImportExportTest extends ActiveMQTestBase {
          msg.putStringProperty("myNonAsciiStringProperty", international.toString());
          msg.putStringProperty("mySpecialCharacters", special);
          msg.putStringProperty(new SimpleString("mySimpleStringProperty"), new SimpleString("mySimpleStringPropertyValue_" + i));
-         msg.putStringProperty(new SimpleString("myNullSimpleStringProperty"), null);
+         msg.putStringProperty(new SimpleString("myNullSimpleStringProperty"), (SimpleString) null);
          producer.send(msg);
       }
 


[3/6] activemq-artemis git commit: ARTEMIS-1586 Refactor to make more generic

Posted by mi...@apache.org.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
index 896a8ed..6e28c0e 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
@@ -372,7 +372,7 @@ public class ActiveMQMessage implements javax.jms.Message {
    public void setJMSReplyTo(final Destination dest) throws JMSException {
 
       if (dest == null) {
-         MessageUtil.setJMSReplyTo(message, null);
+         MessageUtil.setJMSReplyTo(message, (String) null);
          replyTo = null;
       } else {
          if (dest instanceof ActiveMQDestination == false) {
@@ -391,7 +391,7 @@ public class ActiveMQMessage implements javax.jms.Message {
          }
          ActiveMQDestination jbd = (ActiveMQDestination) dest;
 
-         MessageUtil.setJMSReplyTo(message, SimpleString.toSimpleString(prefix + jbd.getAddress()));
+         MessageUtil.setJMSReplyTo(message, prefix + jbd.getAddress());
 
          replyTo = jbd;
       }
@@ -401,14 +401,15 @@ public class ActiveMQMessage implements javax.jms.Message {
    public Destination getJMSDestination() throws JMSException {
       if (dest == null) {
          SimpleString address = message.getAddressSimpleString();
-         String prefix = "";
-         if (RoutingType.ANYCAST.equals(message.getRoutingType())) {
-            prefix = QUEUE_QUALIFIED_PREFIX;
+         if (address == null) {
+            dest = null;
+         } else if (RoutingType.ANYCAST.equals(message.getRoutingType())) {
+            dest = ActiveMQDestination.createQueue(address);
          } else if (RoutingType.MULTICAST.equals(message.getRoutingType())) {
-            prefix = TOPIC_QUALIFIED_PREFIX;
+            dest = ActiveMQDestination.createTopic(address);
+         } else {
+            dest = ActiveMQDestination.fromPrefixedName(address.toString());
          }
-
-         dest = address == null ? null : ActiveMQDestination.fromPrefixedName(prefix + address.toString());
       }
 
       return dest;
@@ -513,7 +514,7 @@ public class ActiveMQMessage implements javax.jms.Message {
    @Override
    public boolean getBooleanProperty(final String name) throws JMSException {
       try {
-         return message.getBooleanProperty(new SimpleString(name));
+         return message.getBooleanProperty(name);
       } catch (ActiveMQPropertyConversionException e) {
          throw new MessageFormatException(e.getMessage());
       }
@@ -522,7 +523,7 @@ public class ActiveMQMessage implements javax.jms.Message {
    @Override
    public byte getByteProperty(final String name) throws JMSException {
       try {
-         return message.getByteProperty(new SimpleString(name));
+         return message.getByteProperty(name);
       } catch (ActiveMQPropertyConversionException e) {
          throw new MessageFormatException(e.getMessage());
       }
@@ -531,7 +532,7 @@ public class ActiveMQMessage implements javax.jms.Message {
    @Override
    public short getShortProperty(final String name) throws JMSException {
       try {
-         return message.getShortProperty(new SimpleString(name));
+         return message.getShortProperty(name);
       } catch (ActiveMQPropertyConversionException e) {
          throw new MessageFormatException(e.getMessage());
       }
@@ -544,7 +545,7 @@ public class ActiveMQMessage implements javax.jms.Message {
       }
 
       try {
-         return message.getIntProperty(new SimpleString(name));
+         return message.getIntProperty(name);
       } catch (ActiveMQPropertyConversionException e) {
          throw new MessageFormatException(e.getMessage());
       }
@@ -557,7 +558,7 @@ public class ActiveMQMessage implements javax.jms.Message {
       }
 
       try {
-         return message.getLongProperty(new SimpleString(name));
+         return message.getLongProperty(name);
       } catch (ActiveMQPropertyConversionException e) {
          throw new MessageFormatException(e.getMessage());
       }
@@ -566,7 +567,7 @@ public class ActiveMQMessage implements javax.jms.Message {
    @Override
    public float getFloatProperty(final String name) throws JMSException {
       try {
-         return message.getFloatProperty(new SimpleString(name));
+         return message.getFloatProperty(name);
       } catch (ActiveMQPropertyConversionException e) {
          throw new MessageFormatException(e.getMessage());
       }
@@ -575,7 +576,7 @@ public class ActiveMQMessage implements javax.jms.Message {
    @Override
    public double getDoubleProperty(final String name) throws JMSException {
       try {
-         return message.getDoubleProperty(new SimpleString(name));
+         return message.getDoubleProperty(name);
       } catch (ActiveMQPropertyConversionException e) {
          throw new MessageFormatException(e.getMessage());
       }
@@ -593,7 +594,7 @@ public class ActiveMQMessage implements javax.jms.Message {
          } else if (MessageUtil.JMSXUSERID.equals(name)) {
             return message.getValidatedUserID();
          } else {
-            return message.getStringProperty(new SimpleString(name));
+            return message.getStringProperty(name);
          }
       } catch (ActiveMQPropertyConversionException e) {
          throw new MessageFormatException(e.getMessage());
@@ -608,7 +609,7 @@ public class ActiveMQMessage implements javax.jms.Message {
 
       Object val = message.getObjectProperty(name);
       if (val instanceof SimpleString) {
-         val = ((SimpleString) val).toString();
+         val = val.toString();
       }
       return val;
    }
@@ -622,43 +623,43 @@ public class ActiveMQMessage implements javax.jms.Message {
    public void setBooleanProperty(final String name, final boolean value) throws JMSException {
       checkProperty(name);
 
-      message.putBooleanProperty(new SimpleString(name), value);
+      message.putBooleanProperty(name, value);
    }
 
    @Override
    public void setByteProperty(final String name, final byte value) throws JMSException {
       checkProperty(name);
-      message.putByteProperty(new SimpleString(name), value);
+      message.putByteProperty(name, value);
    }
 
    @Override
    public void setShortProperty(final String name, final short value) throws JMSException {
       checkProperty(name);
-      message.putShortProperty(new SimpleString(name), value);
+      message.putShortProperty(name, value);
    }
 
    @Override
    public void setIntProperty(final String name, final int value) throws JMSException {
       checkProperty(name);
-      message.putIntProperty(new SimpleString(name), value);
+      message.putIntProperty(name, value);
    }
 
    @Override
    public void setLongProperty(final String name, final long value) throws JMSException {
       checkProperty(name);
-      message.putLongProperty(new SimpleString(name), value);
+      message.putLongProperty(name, value);
    }
 
    @Override
    public void setFloatProperty(final String name, final float value) throws JMSException {
       checkProperty(name);
-      message.putFloatProperty(new SimpleString(name), value);
+      message.putFloatProperty(name, value);
    }
 
    @Override
    public void setDoubleProperty(final String name, final double value) throws JMSException {
       checkProperty(name);
-      message.putDoubleProperty(new SimpleString(name), value);
+      message.putDoubleProperty(name, value);
    }
 
    @Override
@@ -670,7 +671,7 @@ public class ActiveMQMessage implements javax.jms.Message {
       } else if (handleCoreProperty(name, value, MessageUtil.JMSXUSERID, org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER)) {
          return;
       } else {
-         message.putStringProperty(new SimpleString(name), SimpleString.toSimpleString(value));
+         message.putStringProperty(name, value);
       }
    }
 
@@ -703,7 +704,7 @@ public class ActiveMQMessage implements javax.jms.Message {
       }
 
       try {
-         message.putObjectProperty(new SimpleString(name), value);
+         message.putObjectProperty(name, value);
       } catch (ActiveMQPropertyConversionException e) {
          throw new MessageFormatException(e.getMessage());
       }
@@ -964,7 +965,7 @@ public class ActiveMQMessage implements javax.jms.Message {
       boolean result = false;
 
       if (jmsPropertyName.equals(name)) {
-         message.putStringProperty(corePropertyName, SimpleString.toSimpleString(value.toString()));
+         message.putStringProperty(corePropertyName, value.toString());
 
          result = true;
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java
index 2deefa9..ff4ee0f 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java
@@ -18,6 +18,8 @@ package org.apache.activemq.artemis.jms.client;
 
 import javax.jms.Queue;
 
+import org.apache.activemq.artemis.api.core.SimpleString;
+
 /**
  * ActiveMQ Artemis implementation of a JMS Queue.
  * <br>
@@ -34,13 +36,17 @@ public class ActiveMQQueue extends ActiveMQDestination implements Queue {
 
    // Constructors --------------------------------------------------
    public ActiveMQQueue() {
-      this(null);
+      this((SimpleString) null);
    }
 
    public ActiveMQQueue(final String address) {
       super(address, TYPE.QUEUE, null);
    }
 
+   public ActiveMQQueue(final SimpleString address) {
+      super(address, TYPE.QUEUE, null);
+   }
+
    public ActiveMQQueue(final String address, boolean temporary) {
       super(address, temporary ? TYPE.TEMP_QUEUE : TYPE.QUEUE, null);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
index 374a985..cf2ec59 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
@@ -627,7 +627,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
             throw new InvalidDestinationException("Cannot create a durable subscription on a temporary topic");
          }
 
-         queueName = new SimpleString(ActiveMQDestination.createQueueNameForSubscription(durability == ConsumerDurability.DURABLE, connection.getClientID(), subscriptionName));
+         queueName = ActiveMQDestination.createQueueNameForSubscription(durability == ConsumerDurability.DURABLE, connection.getClientID(), subscriptionName);
 
          if (durability == ConsumerDurability.DURABLE) {
             try {
@@ -750,7 +750,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
                   throw new InvalidDestinationException("Cannot create a durable subscription on a temporary topic");
                }
 
-               queueName = new SimpleString(ActiveMQDestination.createQueueNameForSubscription(true, connection.getClientID(), subscriptionName));
+               queueName = ActiveMQDestination.createQueueNameForSubscription(true, connection.getClientID(), subscriptionName);
 
                QueueQuery subResponse = session.queueQuery(queueName);
 
@@ -918,7 +918,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
          throw new IllegalStateException("Cannot unsubscribe using a QueueSession");
       }
 
-      SimpleString queueName = new SimpleString(ActiveMQDestination.createQueueNameForSubscription(true, connection.getClientID(), name));
+      SimpleString queueName = ActiveMQDestination.createQueueNameForSubscription(true, connection.getClientID(), name);
 
       try {
          QueueQuery response = session.queueQuery(queueName);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQStreamMessage.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQStreamMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQStreamMessage.java
index 2762a9c..1c70c5b 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQStreamMessage.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQStreamMessage.java
@@ -73,7 +73,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre
 
    // For testing only
    public ActiveMQStreamMessage() {
-      message = new ClientMessageImpl((byte) 0, false, 0, 0, (byte) 4, 1500);
+      message = new ClientMessageImpl((byte) 0, false, 0, 0, (byte) 4, 1500, null);
    }
 
    // Public --------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTopic.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTopic.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTopic.java
index e22e67b..4dbefec 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTopic.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTopic.java
@@ -18,6 +18,8 @@ package org.apache.activemq.artemis.jms.client;
 
 import javax.jms.Topic;
 
+import org.apache.activemq.artemis.api.core.SimpleString;
+
 /**
  * ActiveMQ Artemis implementation of a JMS Topic.
  * <br>
@@ -33,13 +35,17 @@ public class ActiveMQTopic extends ActiveMQDestination implements Topic {
 
    // Constructors --------------------------------------------------
    public ActiveMQTopic() {
-      this(null);
+      this((SimpleString) null);
    }
 
    public ActiveMQTopic(final String address) {
       this(address, false);
    }
 
+   public ActiveMQTopic(final SimpleString address) {
+      super(address, TYPE.TOPIC, null);
+   }
+
    public ActiveMQTopic(final String address, boolean temporary) {
       super(address, TYPE.TOPIC, null);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 2bdd88a..cdab412 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -33,6 +33,7 @@ import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.RefCountMessage;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter;
 import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageIdHelper;
@@ -70,7 +71,7 @@ public class AMQPMessage extends RefCountMessage {
    boolean bufferValid;
    Boolean durable;
    long messageID;
-   String address;
+   SimpleString address;
    MessageImpl protonMessage;
    private volatile int memoryEstimate = -1;
    private long expiration = 0;
@@ -90,6 +91,7 @@ public class AMQPMessage extends RefCountMessage {
    private ApplicationProperties applicationProperties;
    private long scheduledTime = -1;
    private String connectionID;
+   private final CoreMessageObjectPools coreMessageObjectPools;
 
    Set<Object> rejectedConsumers;
 
@@ -98,9 +100,14 @@ public class AMQPMessage extends RefCountMessage {
    private volatile TypedProperties extraProperties;
 
    public AMQPMessage(long messageFormat, byte[] data) {
+      this(messageFormat, data, null);
+   }
+
+   public AMQPMessage(long messageFormat, byte[] data, CoreMessageObjectPools coreMessageObjectPools) {
       this.data = Unpooled.wrappedBuffer(data);
       this.messageFormat = messageFormat;
       this.bufferValid = true;
+      this.coreMessageObjectPools = coreMessageObjectPools;
       parseHeaders();
    }
 
@@ -108,12 +115,14 @@ public class AMQPMessage extends RefCountMessage {
    public AMQPMessage(long messageFormat) {
       this.messageFormat = messageFormat;
       this.bufferValid = false;
+      this.coreMessageObjectPools = null;
    }
 
    public AMQPMessage(long messageFormat, Message message) {
       this.messageFormat = messageFormat;
       this.protonMessage = (MessageImpl) message;
       this.bufferValid = false;
+      this.coreMessageObjectPools = null;
    }
 
    public AMQPMessage(Message message) {
@@ -301,7 +310,7 @@ public class AMQPMessage extends RefCountMessage {
       parseHeaders();
 
       if (_properties != null && _properties.getGroupId() != null) {
-         return SimpleString.toSimpleString(_properties.getGroupId());
+         return SimpleString.toSimpleString(_properties.getGroupId(), coreMessageObjectPools == null ? null : coreMessageObjectPools.getGroupIdStringSimpleStringPool());
       } else {
          return null;
       }
@@ -588,36 +597,33 @@ public class AMQPMessage extends RefCountMessage {
 
    @Override
    public String getAddress() {
-      if (address == null) {
-         Properties properties = getProtonMessage().getProperties();
-         if (properties != null) {
-            return properties.getTo();
-         } else {
-            return null;
-         }
-      } else {
-         return address;
-      }
+      SimpleString addressSimpleString = getAddressSimpleString();
+      return addressSimpleString == null ? null : addressSimpleString.toString();
    }
 
    @Override
    public AMQPMessage setAddress(String address) {
-      this.address = address;
+      this.address = SimpleString.toSimpleString(address, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressStringSimpleStringPool());
       return this;
    }
 
    @Override
    public AMQPMessage setAddress(SimpleString address) {
-      if (address != null) {
-         return setAddress(address.toString());
-      } else {
-         return setAddress((String) null);
-      }
+      this.address = address;
+      return this;
    }
 
    @Override
    public SimpleString getAddressSimpleString() {
-      return SimpleString.toSimpleString(getAddress());
+      if (address == null) {
+         Properties properties = getProtonMessage().getProperties();
+         if (properties != null) {
+            setAddress(properties.getTo());
+         } else {
+            return null;
+         }
+      }
+      return address;
    }
 
    @Override
@@ -977,7 +983,7 @@ public class AMQPMessage extends RefCountMessage {
       if (applicationProperties != null) getProtonMessage().setApplicationProperties(applicationProperties);
       if (_properties != null) {
          if (address != null) {
-            _properties.setTo(address);
+            _properties.setTo(address.toString());
          }
          getProtonMessage().setProperties(this._properties);
       }
@@ -987,7 +993,7 @@ public class AMQPMessage extends RefCountMessage {
 
    @Override
    public SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException {
-      return SimpleString.toSimpleString((String) getApplicationPropertiesMap().get(key));
+      return SimpleString.toSimpleString((String) getApplicationPropertiesMap().get(key), getPropertyValuesPool());
    }
 
    @Override
@@ -1066,10 +1072,15 @@ public class AMQPMessage extends RefCountMessage {
    }
 
    @Override
+   public org.apache.activemq.artemis.api.core.Message putStringProperty(SimpleString key, String value) {
+      return putStringProperty(key.toString(), value);
+   }
+
+   @Override
    public Set<SimpleString> getPropertyNames() {
       HashSet<SimpleString> values = new HashSet<>();
       for (Object k : getApplicationPropertiesMap().keySet()) {
-         values.add(SimpleString.toSimpleString(k.toString()));
+         values.add(SimpleString.toSimpleString(k.toString(), getPropertyKeysPool()));
       }
       return values;
    }
@@ -1084,17 +1095,22 @@ public class AMQPMessage extends RefCountMessage {
    }
 
    @Override
-   public ICoreMessage toCore() {
+   public ICoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) {
       try {
-         return AMQPConverter.getInstance().toCore(this);
+         return AMQPConverter.getInstance().toCore(this, coreMessageObjectPools);
       } catch (Exception e) {
          throw new RuntimeException(e.getMessage(), e);
       }
    }
 
    @Override
+   public ICoreMessage toCore() {
+      return toCore(null);
+   }
+
+   @Override
    public SimpleString getLastValueProperty() {
-      return getSimpleStringProperty(HDR_LAST_VALUE_NAME.toString());
+      return getSimpleStringProperty(HDR_LAST_VALUE_NAME);
    }
 
    @Override
@@ -1155,4 +1171,12 @@ public class AMQPMessage extends RefCountMessage {
          ", address=" + getAddress() +
          "]";
    }
+
+   private SimpleString.StringSimpleStringPool getPropertyKeysPool() {
+      return coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesStringSimpleStringPools().getPropertyKeysPool();
+   }
+
+   private SimpleString.StringSimpleStringPool getPropertyValuesPool() {
+      return coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesStringSimpleStringPools().getPropertyValuesPool();
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 19348f4..7134d3b 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
@@ -101,6 +102,7 @@ public class AMQPSessionCallback implements SessionCallback {
 
    private final AtomicBoolean draining = new AtomicBoolean(false);
 
+   private CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools();
 
    private final AddressQueryCache<AddressQueryResult> addressQueryCache = new AddressQueryCache<>();
 
@@ -210,14 +212,14 @@ public class AMQPSessionCallback implements SessionCallback {
    }
 
    public Object createSender(ProtonServerSenderContext protonSender,
-                              String queue,
+                              SimpleString queue,
                               String filter,
                               boolean browserOnly) throws Exception {
       long consumerID = consumerIDGenerator.generateID();
 
       filter = SelectorTranslator.convertToActiveMQFilterString(filter);
 
-      ServerConsumer consumer = serverSession.createConsumer(consumerID, SimpleString.toSimpleString(queue), SimpleString.toSimpleString(filter), browserOnly, false, null);
+      ServerConsumer consumer = serverSession.createConsumer(consumerID, queue, SimpleString.toSimpleString(filter), browserOnly, false, null);
 
       // AMQP handles its own flow control for when it's started
       consumer.setStarted(true);
@@ -233,48 +235,48 @@ public class AMQPSessionCallback implements SessionCallback {
       serverConsumer.receiveCredits(-1);
    }
 
-   public void createTemporaryQueue(String queueName, RoutingType routingType) throws Exception {
-      serverSession.createQueue(SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(queueName), routingType, null, true, false);
+   public void createTemporaryQueue(SimpleString queueName, RoutingType routingType) throws Exception {
+      serverSession.createQueue(queueName, queueName, routingType, null, true, false);
    }
 
-   public void createTemporaryQueue(String address,
-                                    String queueName,
+   public void createTemporaryQueue(SimpleString address,
+                                    SimpleString queueName,
                                     RoutingType routingType,
-                                    String filter) throws Exception {
-      serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), true, false);
+                                    SimpleString filter) throws Exception {
+      serverSession.createQueue(address, queueName, routingType, filter, true, false);
    }
 
-   public void createUnsharedDurableQueue(String address,
+   public void createUnsharedDurableQueue(SimpleString address,
                                           RoutingType routingType,
-                                          String queueName,
-                                          String filter) throws Exception {
-      serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, true, 1, false, false);
+                                          SimpleString queueName,
+                                          SimpleString filter) throws Exception {
+      serverSession.createQueue(address, queueName, routingType, filter, false, true, 1, false, false);
    }
 
-   public void createSharedDurableQueue(String address,
+   public void createSharedDurableQueue(SimpleString address,
                                         RoutingType routingType,
-                                        String queueName,
-                                        String filter) throws Exception {
-      serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, true, -1, false, false);
+                                        SimpleString queueName,
+                                        SimpleString filter) throws Exception {
+      serverSession.createQueue(address, queueName, routingType, filter, false, true, -1, false, false);
    }
 
-   public void createSharedVolatileQueue(String address,
+   public void createSharedVolatileQueue(SimpleString address,
                                          RoutingType routingType,
-                                         String queueName,
-                                         String filter) throws Exception {
-      serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, false, -1, true, true);
+                                         SimpleString queueName,
+                                         SimpleString filter) throws Exception {
+      serverSession.createQueue(address, queueName, routingType, filter, false, false, -1, true, true);
    }
 
-   public QueueQueryResult queueQuery(String queueName, RoutingType routingType, boolean autoCreate) throws Exception {
-      QueueQueryResult queueQueryResult = serverSession.executeQueueQuery(SimpleString.toSimpleString(queueName));
+   public QueueQueryResult queueQuery(SimpleString queueName, RoutingType routingType, boolean autoCreate) throws Exception {
+      QueueQueryResult queueQueryResult = serverSession.executeQueueQuery(queueName);
 
       if (!queueQueryResult.isExists() && queueQueryResult.isAutoCreateQueues() && autoCreate) {
          try {
-            serverSession.createQueue(new SimpleString(queueName), new SimpleString(queueName), routingType, null, false, true, true);
+            serverSession.createQueue(queueName, queueName, routingType, null, false, true, true);
          } catch (ActiveMQQueueExistsException e) {
             // The queue may have been created by another thread in the mean time.  Catch and do nothing.
          }
-         queueQueryResult = serverSession.executeQueueQuery(SimpleString.toSimpleString(queueName));
+         queueQueryResult = serverSession.executeQueueQuery(queueName);
       }
 
       // if auto-create we will return whatever type was used before
@@ -287,32 +289,31 @@ public class AMQPSessionCallback implements SessionCallback {
 
 
 
-   public boolean bindingQuery(String address, RoutingType routingType) throws Exception {
+   public boolean bindingQuery(SimpleString address, RoutingType routingType) throws Exception {
       BindingQueryResult bindingQueryResult = bindingQueryCache.getResult(address);
 
       if (bindingQueryResult != null) {
          return bindingQueryResult.isExists();
       }
 
-      SimpleString simpleAddress = SimpleString.toSimpleString(address);
-      bindingQueryResult = serverSession.executeBindingQuery(simpleAddress);
+      bindingQueryResult = serverSession.executeBindingQuery(address);
       if (routingType == RoutingType.MULTICAST && !bindingQueryResult.isExists() && bindingQueryResult.isAutoCreateAddresses()) {
          try {
-            serverSession.createAddress(simpleAddress, routingType, true);
+            serverSession.createAddress(address, routingType, true);
          } catch (ActiveMQAddressExistsException e) {
             // The address may have been created by another thread in the mean time.  Catch and do nothing.
          }
-         bindingQueryResult = serverSession.executeBindingQuery(simpleAddress);
+         bindingQueryResult = serverSession.executeBindingQuery(address);
       } else if (routingType == RoutingType.ANYCAST && bindingQueryResult.isAutoCreateQueues()) {
-         QueueQueryResult queueBinding = serverSession.executeQueueQuery(simpleAddress);
+         QueueQueryResult queueBinding = serverSession.executeQueueQuery(address);
          if (!queueBinding.isExists()) {
             try {
-               serverSession.createQueue(simpleAddress, simpleAddress, routingType, null, false, true, true);
+               serverSession.createQueue(address, address, routingType, null, false, true, true);
             } catch (ActiveMQQueueExistsException e) {
                // The queue may have been created by another thread in the mean time.  Catch and do nothing.
             }
          }
-         bindingQueryResult = serverSession.executeBindingQuery(simpleAddress);
+         bindingQueryResult = serverSession.executeBindingQuery(address);
       }
 
       bindingQueryCache.setResult(address, bindingQueryResult);
@@ -320,7 +321,7 @@ public class AMQPSessionCallback implements SessionCallback {
    }
 
 
-   public AddressQueryResult addressQuery(String addressName,
+   public AddressQueryResult addressQuery(SimpleString addressName,
                                           RoutingType routingType,
                                           boolean autoCreate) throws Exception {
 
@@ -329,15 +330,15 @@ public class AMQPSessionCallback implements SessionCallback {
          return addressQueryResult;
       }
 
-      addressQueryResult = serverSession.executeAddressQuery(SimpleString.toSimpleString(addressName));
+      addressQueryResult = serverSession.executeAddressQuery(addressName);
 
       if (!addressQueryResult.isExists() && addressQueryResult.isAutoCreateAddresses() && autoCreate) {
          try {
-            serverSession.createAddress(SimpleString.toSimpleString(addressName), routingType, true);
+            serverSession.createAddress(addressName, routingType, true);
          } catch (ActiveMQQueueExistsException e) {
             // The queue may have been created by another thread in the mean time.  Catch and do nothing.
          }
-         addressQueryResult = serverSession.executeAddressQuery(SimpleString.toSimpleString(addressName));
+         addressQueryResult = serverSession.executeAddressQuery(addressName);
       }
 
       addressQueryCache.setResult(addressName, addressQueryResult);
@@ -438,15 +439,15 @@ public class AMQPSessionCallback implements SessionCallback {
                           final Transaction transaction,
                           final Receiver receiver,
                           final Delivery delivery,
-                          String address,
+                          SimpleString address,
                           int messageFormat,
                           byte[] data) throws Exception {
-      AMQPMessage message = new AMQPMessage(messageFormat, data);
+      AMQPMessage message = new AMQPMessage(messageFormat, data, coreMessageObjectPools);
       if (address != null) {
-         message.setAddress(new SimpleString(address));
+         message.setAddress(address);
       } else {
          // Anonymous relay must set a To value
-         address = message.getAddress();
+         address = message.getAddressSimpleString();
          if (address == null) {
             rejectMessage(delivery, Symbol.valueOf("failed"), "Missing 'to' field for message sent to an anonymous producer");
             return;
@@ -552,7 +553,7 @@ public class AMQPSessionCallback implements SessionCallback {
       });
    }
 
-   public void offerProducerCredit(final String address,
+   public void offerProducerCredit(final SimpleString address,
                                    final int credits,
                                    final int threshold,
                                    final Receiver receiver) {
@@ -567,7 +568,7 @@ public class AMQPSessionCallback implements SessionCallback {
             connection.flush();
             return;
          }
-         final PagingStore store = manager.getServer().getPagingManager().getPageStore(new SimpleString(address));
+         final PagingStore store = manager.getServer().getPagingManager().getPageStore(address);
          store.checkMemory(new Runnable() {
             @Override
             public void run() {
@@ -587,8 +588,8 @@ public class AMQPSessionCallback implements SessionCallback {
       }
    }
 
-   public void deleteQueue(String queueName) throws Exception {
-      manager.getServer().destroyQueue(new SimpleString(queueName));
+   public void deleteQueue(SimpleString queueName) throws Exception {
+      manager.getServer().destroyQueue(queueName);
    }
 
    public void resetContext(OperationContext oldContext) {
@@ -657,7 +658,7 @@ public class AMQPSessionCallback implements SessionCallback {
    }
 
    @Override
-   public void disconnect(ServerConsumer consumer, String queueName) {
+   public void disconnect(ServerConsumer consumer, SimpleString queueName) {
       ErrorCondition ec = new ErrorCondition(AmqpSupport.RESOURCE_DELETED, "Queue was deleted: " + queueName);
       connection.lock();
       try {
@@ -703,12 +704,12 @@ public class AMQPSessionCallback implements SessionCallback {
       return serverSession.getAddress(address);
    }
 
-   public void removeTemporaryQueue(String address) throws Exception {
-      serverSession.deleteQueue(SimpleString.toSimpleString(address));
+   public void removeTemporaryQueue(SimpleString address) throws Exception {
+      serverSession.deleteQueue(address);
    }
 
-   public RoutingType getDefaultRoutingType(String address) {
-      return manager.getServer().getAddressSettingsRepository().getMatch(address).getDefaultAddressRoutingType();
+   public RoutingType getDefaultRoutingType(SimpleString address) {
+      return manager.getServer().getAddressSettingsRepository().getMatch(address.toString()).getDefaultAddressRoutingType();
    }
 
    public void check(SimpleString address, CheckType checkType, SecurityAuth session) throws Exception {
@@ -733,10 +734,10 @@ public class AMQPSessionCallback implements SessionCallback {
 
 
    class AddressQueryCache<T> {
-      String address;
+      SimpleString address;
       T result;
 
-      public synchronized T getResult(String parameterAddress) {
+      public synchronized T getResult(SimpleString parameterAddress) {
          if (address != null && address.equals(parameterAddress)) {
             return result;
          } else {
@@ -746,7 +747,7 @@ public class AMQPSessionCallback implements SessionCallback {
          }
       }
 
-      public synchronized void setResult(String parameterAddress, T result) {
+      public synchronized void setResult(SimpleString parameterAddress, T result) {
          this.address = parameterAddress;
          this.result = result;
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPConverter.java
index 724474b..e67fc67 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPConverter.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPConverter.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.protocol.amqp.converter;
 
 import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
 import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
 
@@ -38,7 +39,7 @@ public class AMQPConverter implements MessageConverter<AMQPMessage> {
    }
 
    @Override
-   public ICoreMessage toCore(AMQPMessage messageSource) throws Exception {
-      return AmqpCoreConverter.toCore(messageSource);
+   public ICoreMessage toCore(AMQPMessage messageSource, CoreMessageObjectPools coreMessageObjectPools) throws Exception {
+      return AmqpCoreConverter.toCore(messageSource, coreMessageObjectPools);
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
index da2f4e0..1bac1e5 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage;
@@ -242,56 +243,56 @@ public final class AMQPMessageSupport {
       return null;
    }
 
-   public static ServerJMSBytesMessage createBytesMessage(long id) {
-      return new ServerJMSBytesMessage(newMessage(id, BYTES_TYPE));
+   public static ServerJMSBytesMessage createBytesMessage(long id, CoreMessageObjectPools coreMessageObjectPools) {
+      return new ServerJMSBytesMessage(newMessage(id, BYTES_TYPE, coreMessageObjectPools));
    }
 
-   public static ServerJMSBytesMessage createBytesMessage(long id, byte[] array, int arrayOffset, int length) throws JMSException {
-      ServerJMSBytesMessage message = createBytesMessage(id);
+   public static ServerJMSBytesMessage createBytesMessage(long id, byte[] array, int arrayOffset, int length, CoreMessageObjectPools coreMessageObjectPools) throws JMSException {
+      ServerJMSBytesMessage message = createBytesMessage(id, coreMessageObjectPools);
       message.writeBytes(array, arrayOffset, length);
       return message;
    }
 
-   public static ServerJMSStreamMessage createStreamMessage(long id) {
-      return new ServerJMSStreamMessage(newMessage(id, STREAM_TYPE));
+   public static ServerJMSStreamMessage createStreamMessage(long id, CoreMessageObjectPools coreMessageObjectPools) {
+      return new ServerJMSStreamMessage(newMessage(id, STREAM_TYPE, coreMessageObjectPools));
    }
 
-   public static ServerJMSMessage createMessage(long id) {
-      return new ServerJMSMessage(newMessage(id, DEFAULT_TYPE));
+   public static ServerJMSMessage createMessage(long id, CoreMessageObjectPools coreMessageObjectPools) {
+      return new ServerJMSMessage(newMessage(id, DEFAULT_TYPE, coreMessageObjectPools));
    }
 
-   public static ServerJMSTextMessage createTextMessage(long id) {
-      return new ServerJMSTextMessage(newMessage(id, TEXT_TYPE));
+   public static ServerJMSTextMessage createTextMessage(long id, CoreMessageObjectPools coreMessageObjectPools) {
+      return new ServerJMSTextMessage(newMessage(id, TEXT_TYPE, coreMessageObjectPools));
    }
 
-   public static ServerJMSTextMessage createTextMessage(long id, String text) throws JMSException {
-      ServerJMSTextMessage message = createTextMessage(id);
+   public static ServerJMSTextMessage createTextMessage(long id, String text, CoreMessageObjectPools coreMessageObjectPools) throws JMSException {
+      ServerJMSTextMessage message = createTextMessage(id, coreMessageObjectPools);
       message.setText(text);
       return message;
    }
 
-   public static ServerJMSObjectMessage createObjectMessage(long id) {
-      return new ServerJMSObjectMessage(newMessage(id, OBJECT_TYPE));
+   public static ServerJMSObjectMessage createObjectMessage(long id, CoreMessageObjectPools coreMessageObjectPools) {
+      return new ServerJMSObjectMessage(newMessage(id, OBJECT_TYPE, coreMessageObjectPools));
    }
 
-   public static ServerJMSMessage createObjectMessage(long id, Binary serializedForm) throws JMSException {
-      ServerJMSObjectMessage message = createObjectMessage(id);
+   public static ServerJMSMessage createObjectMessage(long id, Binary serializedForm, CoreMessageObjectPools coreMessageObjectPools) throws JMSException {
+      ServerJMSObjectMessage message = createObjectMessage(id, coreMessageObjectPools);
       message.setSerializedForm(serializedForm);
       return message;
    }
 
-   public static ServerJMSMessage createObjectMessage(long id, byte[] array, int offset, int length) throws JMSException {
-      ServerJMSObjectMessage message = createObjectMessage(id);
+   public static ServerJMSMessage createObjectMessage(long id, byte[] array, int offset, int length, CoreMessageObjectPools coreMessageObjectPools) throws JMSException {
+      ServerJMSObjectMessage message = createObjectMessage(id, coreMessageObjectPools);
       message.setSerializedForm(new Binary(array, offset, length));
       return message;
    }
 
-   public static ServerJMSMapMessage createMapMessage(long id) {
-      return new ServerJMSMapMessage(newMessage(id, MAP_TYPE));
+   public static ServerJMSMapMessage createMapMessage(long id, CoreMessageObjectPools coreMessageObjectPools) {
+      return new ServerJMSMapMessage(newMessage(id, MAP_TYPE, coreMessageObjectPools));
    }
 
-   public static ServerJMSMapMessage createMapMessage(long id, Map<String, Object> content) throws JMSException {
-      ServerJMSMapMessage message = createMapMessage(id);
+   public static ServerJMSMapMessage createMapMessage(long id, Map<String, Object> content, CoreMessageObjectPools coreMessageObjectPools) throws JMSException {
+      ServerJMSMapMessage message = createMapMessage(id, coreMessageObjectPools);
       final Set<Map.Entry<String, Object>> set = content.entrySet();
       for (Map.Entry<String, Object> entry : set) {
          Object value = entry.getValue();
@@ -304,8 +305,8 @@ public final class AMQPMessageSupport {
       return message;
    }
 
-   private static CoreMessage newMessage(long id, byte messageType) {
-      CoreMessage message = new CoreMessage(id, 512);
+   private static CoreMessage newMessage(long id, byte messageType, CoreMessageObjectPools coreMessageObjectPools) {
+      CoreMessage message = new CoreMessage(id, 512, coreMessageObjectPools);
       message.setType(messageType);
 //      ((ResetLimitWrappedActiveMQBuffer) message.getBodyBuffer()).setMessage(null);
       return message;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
index fbaf0ef..80969f6 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
@@ -52,6 +52,7 @@ import javax.jms.JMSException;
 
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
@@ -89,31 +90,31 @@ import io.netty.buffer.PooledByteBufAllocator;
 public class AmqpCoreConverter {
 
    @SuppressWarnings("unchecked")
-   public static ICoreMessage toCore(AMQPMessage message) throws Exception {
+   public static ICoreMessage toCore(AMQPMessage message, CoreMessageObjectPools coreMessageObjectPools) throws Exception {
 
       Section body = message.getProtonMessage().getBody();
       ServerJMSMessage result;
 
       if (body == null) {
          if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), message.getProtonMessage())) {
-            result = createObjectMessage(message.getMessageID());
+            result = createObjectMessage(message.getMessageID(), coreMessageObjectPools);
          } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message.getProtonMessage()) || isContentType(null, message.getProtonMessage())) {
-            result = createBytesMessage(message.getMessageID());
+            result = createBytesMessage(message.getMessageID(), coreMessageObjectPools);
          } else {
             Charset charset = getCharsetForTextualContent(message.getProtonMessage().getContentType());
             if (charset != null) {
-               result = createTextMessage(message.getMessageID());
+               result = createTextMessage(message.getMessageID(), coreMessageObjectPools);
             } else {
-               result = createMessage(message.getMessageID());
+               result = createMessage(message.getMessageID(), coreMessageObjectPools);
             }
          }
       } else if (body instanceof Data) {
          Binary payload = ((Data) body).getValue();
 
          if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), message.getProtonMessage())) {
-            result = createObjectMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength());
+            result = createObjectMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
          } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message.getProtonMessage())) {
-            result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength());
+            result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
          } else {
             Charset charset = getCharsetForTextualContent(message.getProtonMessage().getContentType());
             if (StandardCharsets.UTF_8.equals(charset)) {
@@ -121,18 +122,18 @@ public class AmqpCoreConverter {
 
                try {
                   CharBuffer chars = charset.newDecoder().decode(buf);
-                  result = createTextMessage(message.getMessageID(), String.valueOf(chars));
+                  result = createTextMessage(message.getMessageID(), String.valueOf(chars), coreMessageObjectPools);
                } catch (CharacterCodingException e) {
-                  result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength());
+                  result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
                }
             } else {
-               result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength());
+               result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
             }
          }
 
       } else if (body instanceof AmqpSequence) {
          AmqpSequence sequence = (AmqpSequence) body;
-         ServerJMSStreamMessage m = createStreamMessage(message.getMessageID());
+         ServerJMSStreamMessage m = createStreamMessage(message.getMessageID(), coreMessageObjectPools);
          for (Object item : sequence.getValue()) {
             m.writeObject(item);
          }
@@ -141,31 +142,31 @@ public class AmqpCoreConverter {
       } else if (body instanceof AmqpValue) {
          Object value = ((AmqpValue) body).getValue();
          if (value == null || value instanceof String) {
-            result = createTextMessage(message.getMessageID(), (String) value);
+            result = createTextMessage(message.getMessageID(), (String) value, coreMessageObjectPools);
 
          } else if (value instanceof Binary) {
             Binary payload = (Binary) value;
 
             if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), message.getProtonMessage())) {
-               result = createObjectMessage(message.getMessageID(), payload);
+               result = createObjectMessage(message.getMessageID(), payload, coreMessageObjectPools);
             } else {
-               result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength());
+               result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
             }
 
          } else if (value instanceof List) {
-            ServerJMSStreamMessage m = createStreamMessage(message.getMessageID());
+            ServerJMSStreamMessage m = createStreamMessage(message.getMessageID(), coreMessageObjectPools);
             for (Object item : (List<Object>) value) {
                m.writeObject(item);
             }
             result = m;
          } else if (value instanceof Map) {
-            result = createMapMessage(message.getMessageID(), (Map<String, Object>) value);
+            result = createMapMessage(message.getMessageID(), (Map<String, Object>) value, coreMessageObjectPools);
          } else {
             ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
             try {
                TLSEncode.getEncoder().setByteBuffer(new NettyWritable(buf));
                TLSEncode.getEncoder().writeObject(body);
-               result = createBytesMessage(message.getMessageID(), buf.array(), 0, buf.writerIndex());
+               result = createBytesMessage(message.getMessageID(), buf.array(), 0, buf.writerIndex(), coreMessageObjectPools);
             } finally {
                buf.release();
                TLSEncode.getEncoder().setByteBuffer((WritableBuffer)null);
@@ -186,7 +187,7 @@ public class AmqpCoreConverter {
       result.getInnerMessage().setReplyTo(message.getReplyTo());
       result.getInnerMessage().setDurable(message.isDurable());
       result.getInnerMessage().setPriority(message.getPriority());
-      result.getInnerMessage().setAddress(message.getAddress());
+      result.getInnerMessage().setAddress(message.getAddressSimpleString());
 
       result.encode();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index 3e1c0fe..3c35d76 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -54,7 +54,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
 
    protected final Receiver receiver;
 
-   protected String address;
+   protected SimpleString address;
 
    protected final AMQPSessionCallback sessionSPI;
 
@@ -102,7 +102,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
          if (target.getDynamic()) {
             // if dynamic we have to create the node (queue) and set the address on the target, the node is temporary and
             // will be deleted on closing of the session
-            address = sessionSPI.tempQueueName();
+            address = SimpleString.toSimpleString(sessionSPI.tempQueueName());
             defRoutingType = getRoutingType(target.getCapabilities(), address);
 
             try {
@@ -113,12 +113,12 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
                throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
             }
             expiryPolicy = target.getExpiryPolicy() != null ? target.getExpiryPolicy() : TerminusExpiryPolicy.LINK_DETACH;
-            target.setAddress(address);
+            target.setAddress(address.toString());
          } else {
             // the target will have an address unless the remote is requesting an anonymous
             // relay in which case the address in the incoming message's to field will be
             // matched on receive of the message.
-            address = target.getAddress();
+            address = SimpleString.toSimpleString(target.getAddress());
 
             if (address != null && !address.isEmpty()) {
                defRoutingType = getRoutingType(target.getCapabilities(), address);
@@ -134,7 +134,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
                }
 
                try {
-                  sessionSPI.check(SimpleString.toSimpleString(address), CheckType.SEND, new SecurityAuth() {
+                  sessionSPI.check(address, CheckType.SEND, new SecurityAuth() {
                      @Override
                      public String getUsername() {
                         String username = null;
@@ -181,12 +181,12 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
       flow(amqpCredits, minCreditRefresh);
    }
 
-   public RoutingType getRoutingType(Receiver receiver, String address) {
+   public RoutingType getRoutingType(Receiver receiver, SimpleString address) {
       org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget();
       return target != null ? getRoutingType(target.getCapabilities(), address) : getRoutingType((Symbol[]) null, address);
    }
 
-   private RoutingType getRoutingType(Symbol[] symbols, String address) {
+   private RoutingType getRoutingType(Symbol[] symbols, SimpleString address) {
       if (symbols != null) {
          for (Symbol symbol : symbols) {
             if (AmqpSupport.TEMP_TOPIC_CAPABILITY.equals(symbol) || AmqpSupport.TOPIC_CAPABILITY.equals(symbol)) {
@@ -264,7 +264,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
       org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget();
       if (target != null && target.getDynamic() && (target.getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH || target.getExpiryPolicy() == TerminusExpiryPolicy.SESSION_END)) {
          try {
-            sessionSPI.removeTemporaryQueue(target.getAddress());
+            sessionSPI.removeTemporaryQueue(SimpleString.toSimpleString(target.getAddress()));
          } catch (Exception e) {
             //ignore on close, its temp anyway and will be removed later
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index fbaae8a..1823168 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -102,7 +102,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
    private boolean shared = false;
    private boolean global = false;
    private boolean isVolatile = false;
-   private String tempQueueName;
+   private SimpleString tempQueueName;
 
    public ProtonServerSenderContext(AMQPConnectionContext connection,
                                     Sender sender,
@@ -157,7 +157,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
       super.initialise();
 
       Source source = (Source) sender.getRemoteSource();
-      String queue = null;
+      SimpleString queue = null;
       String selector = null;
       final Map<Symbol, Object> supportedFilters = new HashMap<>();
 
@@ -199,7 +199,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
          // the lifetime policy and capabilities of the new subscription.
          if (result.isExists()) {
             source = new org.apache.qpid.proton.amqp.messaging.Source();
-            source.setAddress(queue);
+            source.setAddress(queue.toString());
             source.setDurable(TerminusDurability.UNSETTLED_STATE);
             source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
             source.setDistributionMode(COPY);
@@ -240,7 +240,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
       } else if (source.getDynamic()) {
          // if dynamic we have to create the node (queue) and set the address on the target, the
          // node is temporary and  will be deleted on closing of the session
-         queue = java.util.UUID.randomUUID().toString();
+         queue = SimpleString.toSimpleString(java.util.UUID.randomUUID().toString());
          tempQueueName = queue;
          try {
             sessionSPI.createTemporaryQueue(queue, RoutingType.ANYCAST);
@@ -248,7 +248,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
          } catch (Exception e) {
             throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
          }
-         source.setAddress(queue);
+         source.setAddress(queue.toString());
       } else {
          SimpleString addressToUse;
          SimpleString queueNameToUse = null;
@@ -269,7 +269,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
             multicast = hasCapabilities(TOPIC, source);
             AddressQueryResult addressQueryResult = null;
             try {
-               addressQueryResult = sessionSPI.addressQuery(addressToUse.toString(), multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, true);
+               addressQueryResult = sessionSPI.addressQuery(addressToUse, multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, true);
             } catch (ActiveMQSecurityException e) {
                throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(e.getMessage());
             } catch (ActiveMQAMQPException e) {
@@ -294,7 +294,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
             // if not we look up the address
             AddressQueryResult addressQueryResult = null;
             try {
-               addressQueryResult = sessionSPI.addressQuery(addressToUse.toString(), defaultRoutingType, true);
+               addressQueryResult = sessionSPI.addressQuery(addressToUse, defaultRoutingType, true);
             } catch (ActiveMQSecurityException e) {
                throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(e.getMessage());
             } catch (ActiveMQAMQPException e) {
@@ -333,6 +333,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
             }
 
             queue = getMatchingQueue(queueNameToUse, addressToUse, RoutingType.MULTICAST);
+            SimpleString simpleStringSelector = SimpleString.toSimpleString(selector);
 
             //if the address specifies a broker configured queue then we always use this, treat it as a queue
             if (queue != null) {
@@ -345,24 +346,23 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
                String pubId = sender.getName();
                queue = createQueueName(connection.isUseCoreSubscriptionNaming(), clientId, pubId, shared, global, false);
                QueueQueryResult result = sessionSPI.queueQuery(queue, routingTypeToUse, false);
-
                if (result.isExists()) {
                   // If a client reattaches to a durable subscription with a different no-local
                   // filter value, selector or address then we must recreate the queue (JMS semantics).
-                  if (!Objects.equals(result.getFilterString(), SimpleString.toSimpleString(selector)) || (sender.getSource() != null && !sender.getSource().getAddress().equals(result.getAddress().toString()))) {
+                  if (!Objects.equals(result.getFilterString(), simpleStringSelector) || (sender.getSource() != null && !sender.getSource().getAddress().equals(result.getAddress().toString()))) {
 
                      if (result.getConsumerCount() == 0) {
                         sessionSPI.deleteQueue(queue);
-                        sessionSPI.createUnsharedDurableQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector);
+                        sessionSPI.createUnsharedDurableQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector);
                      } else {
                         throw new ActiveMQAMQPIllegalStateException("Unable to recreate subscription, consumers already exist");
                      }
                   }
                } else {
                   if (shared) {
-                     sessionSPI.createSharedDurableQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector);
+                     sessionSPI.createSharedDurableQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector);
                   } else {
-                     sessionSPI.createUnsharedDurableQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector);
+                     sessionSPI.createUnsharedDurableQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector);
                   }
                }
             } else {
@@ -371,15 +371,15 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
                if (shared && sender.getName() != null) {
                   queue = createQueueName(connection.isUseCoreSubscriptionNaming(), getClientId(), sender.getName(), shared, global, isVolatile);
                   try {
-                     sessionSPI.createSharedVolatileQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector);
+                     sessionSPI.createSharedVolatileQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector);
                   } catch (ActiveMQQueueExistsException e) {
                      //this is ok, just means its shared
                   }
                } else {
-                  queue = java.util.UUID.randomUUID().toString();
+                  queue = SimpleString.toSimpleString(java.util.UUID.randomUUID().toString());
                   tempQueueName = queue;
                   try {
-                     sessionSPI.createTemporaryQueue(source.getAddress(), queue, RoutingType.MULTICAST, selector);
+                     sessionSPI.createTemporaryQueue(addressToUse, queue, RoutingType.MULTICAST, simpleStringSelector);
                   } catch (Exception e) {
                      throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
                   }
@@ -387,18 +387,18 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
             }
          } else {
             if (queueNameToUse != null) {
-               SimpleString matchingAnycastQueue = SimpleString.toSimpleString(getMatchingQueue(queueNameToUse, addressToUse, RoutingType.ANYCAST));
+               SimpleString matchingAnycastQueue = getMatchingQueue(queueNameToUse, addressToUse, RoutingType.ANYCAST);
                if (matchingAnycastQueue != null) {
-                  queue = matchingAnycastQueue.toString();
+                  queue = matchingAnycastQueue;
                } else {
                   throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
                }
             } else {
                SimpleString matchingAnycastQueue = sessionSPI.getMatchingQueue(addressToUse, RoutingType.ANYCAST);
                if (matchingAnycastQueue != null) {
-                  queue = matchingAnycastQueue.toString();
+                  queue = matchingAnycastQueue;
                } else {
-                  queue = addressToUse.toString();
+                  queue = addressToUse;
                }
             }
 
@@ -437,16 +437,16 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
       }
    }
 
-   private String getMatchingQueue(SimpleString queueName, SimpleString address, RoutingType routingType) throws Exception {
+   private SimpleString getMatchingQueue(SimpleString queueName, SimpleString address, RoutingType routingType) throws Exception {
       if (queueName != null) {
-         QueueQueryResult result = sessionSPI.queueQuery(queueName.toString(), routingType, false);
+         QueueQueryResult result = sessionSPI.queueQuery(queueName, routingType, false);
          if (!result.isExists()) {
             throw new ActiveMQAMQPNotFoundException("Queue: '" + queueName + "' does not exist");
          } else {
             if (!result.getAddress().equals(address)) {
                throw new ActiveMQAMQPNotFoundException("Queue: '" + queueName + "' does not exist for address '" + address + "'");
             }
-            return sessionSPI.getMatchingQueue(address, queueName, routingType).toString();
+            return sessionSPI.getMatchingQueue(address, queueName, routingType);
          }
       }
       return null;
@@ -495,7 +495,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
          if (remoteLinkClose) {
             Source source = (Source) sender.getSource();
             if (source != null && source.getAddress() != null && multicast) {
-               String queueName = source.getAddress();
+               SimpleString queueName = SimpleString.toSimpleString(source.getAddress());
                QueueQueryResult result = sessionSPI.queueQuery(queueName, routingTypeToUse, false);
                if (result.isExists() && source.getDynamic()) {
                   sessionSPI.deleteQueue(queueName);
@@ -508,7 +508,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
                      if (pubId.contains("|")) {
                         pubId = pubId.split("\\|")[0];
                      }
-                     String queue = createQueueName(connection.isUseCoreSubscriptionNaming(), clientId, pubId, shared, global, isVolatile);
+                     SimpleString queue = createQueueName(connection.isUseCoreSubscriptionNaming(), clientId, pubId, shared, global, isVolatile);
                      result = sessionSPI.queueQuery(queue, multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, false);
                      //only delete if it isn't volatile and has no consumers
                      if (result.isExists() && !isVolatile && result.getConsumerCount() == 0) {
@@ -518,7 +518,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
                }
             } else if (source != null && source.getDynamic() && (source.getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH || source.getExpiryPolicy() == TerminusExpiryPolicy.SESSION_END)) {
                try {
-                  sessionSPI.removeTemporaryQueue(source.getAddress());
+                  sessionSPI.removeTemporaryQueue(SimpleString.toSimpleString(source.getAddress()));
                } catch (Exception e) {
                   //ignore on close, its temp anyway and will be removed later
                }
@@ -760,7 +760,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
       return false;
    }
 
-   private static String createQueueName(boolean useCoreSubscriptionNaming,
+   private static SimpleString createQueueName(boolean useCoreSubscriptionNaming,
                                          String clientId,
                                          String pubId,
                                          boolean shared,
@@ -784,7 +784,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
                queue += ":global";
             }
          }
-         return queue;
+         return SimpleString.toSimpleString(queue);
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java
index d06464f..ccafd37 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java
@@ -620,7 +620,7 @@ public class JMSMappingOutboundTransformerTest {
    }
 
    private ServerJMSObjectMessage createObjectMessage(Serializable payload, boolean compression) {
-      ServerJMSObjectMessage result = AMQPMessageSupport.createObjectMessage(0);
+      ServerJMSObjectMessage result = AMQPMessageSupport.createObjectMessage(0, null);
 
       if (compression) {
          // TODO
@@ -647,7 +647,7 @@ public class JMSMappingOutboundTransformerTest {
    }
 
    private ServerJMSTextMessage createTextMessage(String text, boolean compression) {
-      ServerJMSTextMessage result = AMQPMessageSupport.createTextMessage(0);
+      ServerJMSTextMessage result = AMQPMessageSupport.createTextMessage(0, null);
 
       if (compression) {
          // TODO

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
index 73dbeaa..da10f47 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
@@ -22,6 +22,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.activemq.artemis.core.config.WildcardConfiguration;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
 import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
@@ -56,11 +57,12 @@ public class MQTTSession {
 
    private MQTTProtocolManager protocolManager;
 
-
    private boolean isClean;
 
    private WildcardConfiguration wildcardConfiguration;
 
+   private CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools();
+
    public MQTTSession(MQTTProtocolHandler protocolHandler,
                       MQTTConnection connection,
                       MQTTProtocolManager protocolManager,
@@ -195,4 +197,8 @@ public class MQTTSession {
    public void setWildcardConfiguration(WildcardConfiguration wildcardConfiguration) {
       this.wildcardConfiguration = wildcardConfiguration;
    }
+
+   public CoreMessageObjectPools getCoreMessageObjectPools() {
+      return coreMessageObjectPools;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
index 21b1f2b..39e2ba9 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
@@ -78,7 +78,7 @@ public class MQTTSessionCallback implements SessionCallback {
    }
 
    @Override
-   public void disconnect(ServerConsumer consumer, String queueName) {
+   public void disconnect(ServerConsumer consumer, SimpleString queueName) {
       try {
          consumer.removeItself();
       } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
index 2cb1f7e..2667f81 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
@@ -64,13 +64,13 @@ public class MQTTUtil {
 
    public static final String MQTT_RETAIN_ADDRESS_PREFIX = "$sys.mqtt.retain.";
 
-   public static final String MQTT_QOS_LEVEL_KEY = "mqtt.qos.level";
+   public static final SimpleString MQTT_QOS_LEVEL_KEY = SimpleString.toSimpleString("mqtt.qos.level");
 
-   public static final String MQTT_MESSAGE_ID_KEY = "mqtt.message.id";
+   public static final SimpleString MQTT_MESSAGE_ID_KEY = SimpleString.toSimpleString("mqtt.message.id");
 
-   public static final String MQTT_MESSAGE_TYPE_KEY = "mqtt.message.type";
+   public static final SimpleString MQTT_MESSAGE_TYPE_KEY = SimpleString.toSimpleString("mqtt.message.type");
 
-   public static final SimpleString MQTT_MESSAGE_RETAIN_KEY = new SimpleString("mqtt.message.retain");
+   public static final SimpleString MQTT_MESSAGE_RETAIN_KEY = SimpleString.toSimpleString("mqtt.message.retain");
 
    public static final String MANAGEMENT_QUEUE_PREFIX = "$sys.mqtt.queue.qos2.";
 
@@ -113,10 +113,10 @@ public class MQTTUtil {
                                                     int qos) {
       long id = session.getServer().getStorageManager().generateID();
 
-      CoreMessage message = new CoreMessage(id, DEFAULT_SERVER_MESSAGE_BUFFER_SIZE);
+      CoreMessage message = new CoreMessage(id, DEFAULT_SERVER_MESSAGE_BUFFER_SIZE, session.getCoreMessageObjectPools());
       message.setAddress(address);
       message.putBooleanProperty(MQTT_MESSAGE_RETAIN_KEY, retain);
-      message.putIntProperty(new SimpleString(MQTT_QOS_LEVEL_KEY), qos);
+      message.putIntProperty(MQTT_QOS_LEVEL_KEY, qos);
       message.setType(Message.BYTES_TYPE);
       return message;
    }
@@ -127,7 +127,8 @@ public class MQTTUtil {
                                                               int qos,
                                                               ByteBuf payload) {
       String coreAddress = convertMQTTAddressFilterToCore(topic, session.getWildcardConfiguration());
-      ICoreMessage message = createServerMessage(session, new SimpleString(coreAddress), retain, qos);
+      SimpleString address = SimpleString.toSimpleString(coreAddress, session.getCoreMessageObjectPools().getAddressStringSimpleStringPool());
+      ICoreMessage message = createServerMessage(session, address, retain, qos);
 
       message.getBodyBuffer().writeBytes(payload, 0, payload.readableBytes());
       return message;
@@ -135,8 +136,8 @@ public class MQTTUtil {
 
    public static Message createPubRelMessage(MQTTSession session, SimpleString address, int messageId) {
       Message message = createServerMessage(session, address, false, 1);
-      message.putIntProperty(new SimpleString(MQTTUtil.MQTT_MESSAGE_ID_KEY), messageId);
-      message.putIntProperty(new SimpleString(MQTTUtil.MQTT_MESSAGE_TYPE_KEY), MqttMessageType.PUBREL.value());
+      message.putIntProperty(MQTTUtil.MQTT_MESSAGE_ID_KEY, messageId);
+      message.putIntProperty(MQTTUtil.MQTT_MESSAGE_TYPE_KEY, MqttMessageType.PUBREL.value());
       return message;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 9923953..86a95db 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -1121,7 +1121,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
 
       @Override
       public Response processRemoveSubscription(RemoveSubscriptionInfo subInfo) throws Exception {
-         SimpleString subQueueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForSubscription(true, subInfo.getClientId(), subInfo.getSubscriptionName()));
+         SimpleString subQueueName = org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForSubscription(true, subInfo.getClientId(), subInfo.getSubscriptionName());
          server.destroyQueue(subQueueName);
 
          return null;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 6af9997..83ff6d6 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -38,6 +38,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
 import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
 import org.apache.activemq.artemis.core.server.MessageReference;
@@ -108,10 +109,11 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
    }
 
    @Override
-   public ICoreMessage toCore(OpenwireMessage pureMessage) throws Exception {
+   public ICoreMessage toCore(OpenwireMessage pureMessage, CoreMessageObjectPools coreMessageObjectPools) throws Exception {
       return null;
    }
 
+
    //   @Override
    public Object outbound(org.apache.activemq.artemis.api.core.Message message, int deliveryCount) {
       // TODO: implement this
@@ -119,10 +121,10 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
    }
 
 //   @Override
-   public org.apache.activemq.artemis.api.core.Message inbound(Object message) throws Exception {
+   public org.apache.activemq.artemis.api.core.Message inbound(Object message, CoreMessageObjectPools coreMessageObjectPools) throws Exception {
 
       Message messageSend = (Message) message;
-      CoreMessage coreMessage = new CoreMessage(-1, messageSend.getSize());
+      CoreMessage coreMessage = new CoreMessage(-1, messageSend.getSize(), coreMessageObjectPools);
 
       String type = messageSend.getType();
       if (type != null) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
index d28eda4..c63fe19 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
@@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.RefCountMessageListener;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.persistence.Persister;
 
 // TODO: Implement this
@@ -442,6 +443,11 @@ public class OpenwireMessage implements Message {
    }
 
    @Override
+   public Message putStringProperty(SimpleString key, String value) {
+      return null;
+   }
+
+   @Override
    public int getEncodeSize() {
       return 0;
    }
@@ -478,6 +484,11 @@ public class OpenwireMessage implements Message {
 
    @Override
    public ICoreMessage toCore() {
+      return toCore(null);
+   }
+
+   @Override
+   public ICoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) {
       return null;
    }
 


[4/6] activemq-artemis git commit: ARTEMIS-1586 Refactor to make more generic

Posted by mi...@apache.org.
ARTEMIS-1586 Refactor to make more generic

* Move byte util code into ByteUtil
* Re-use the new equals method in SimpleString
* Apply same pools/interners to client decode
* Create String to SimpleString pools/interners for property access via String keys (producer and consumer benefits)
* Lazy init the pools on withing the get methods of CoreMessageObjectPools to get the specific pool, to avoid having this scattered every where.
* reduce SimpleString creation in conversion to/from core message methods with JMS wrapper.
* reduce SimpleString creation in conversion to/from Core in OpenWire, AMQP, MQTT.

Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/98028cde
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/98028cde
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/98028cde

Branch: refs/heads/master
Commit: 98028cdecc6bd31c86a7d6decfed1961e46be7b2
Parents: 8d776ed
Author: Michael André Pearce <mi...@me.com>
Authored: Wed Jan 10 08:48:14 2018 +0000
Committer: Michael Pearce <mi...@me.com>
Committed: Wed Jan 17 09:33:41 2018 +0100

----------------------------------------------------------------------
 .../activemq/artemis/api/core/SimpleString.java | 237 ++++++++-----------
 .../artemis/utils/AbstractByteBufPool.java      | 164 +++++++++++++
 .../artemis/utils/AbstractInterner.java         | 157 ------------
 .../activemq/artemis/utils/AbstractPool.java    |  89 +++++++
 .../apache/activemq/artemis/utils/ByteUtil.java | 122 ++++++++++
 .../utils/collections/TypedProperties.java      | 122 +++++++---
 .../activemq/artemis/api/core/Message.java      |   6 +
 .../core/client/impl/ClientMessageImpl.java     |  23 +-
 .../core/client/impl/ClientSessionImpl.java     |   5 +-
 .../artemis/core/message/impl/CoreMessage.java  | 107 +++++----
 .../message/impl/CoreMessageObjectPools.java    |  55 +++++
 .../core/protocol/ClientPacketDecoder.java      |  11 +-
 .../impl/ActiveMQClientProtocolManager.java     |   2 +-
 .../activemq/artemis/reader/MessageUtil.java    |  10 +-
 .../artemis/message/CoreMessageTest.java        |   2 +-
 .../artemis/jms/client/ActiveMQDestination.java |  20 +-
 .../artemis/jms/client/ActiveMQMessage.java     |  55 ++---
 .../artemis/jms/client/ActiveMQQueue.java       |   8 +-
 .../artemis/jms/client/ActiveMQSession.java     |   6 +-
 .../jms/client/ActiveMQStreamMessage.java       |   2 +-
 .../artemis/jms/client/ActiveMQTopic.java       |   8 +-
 .../protocol/amqp/broker/AMQPMessage.java       |  74 ++++--
 .../amqp/broker/AMQPSessionCallback.java        | 105 ++++----
 .../protocol/amqp/converter/AMQPConverter.java  |   5 +-
 .../amqp/converter/AMQPMessageSupport.java      |  49 ++--
 .../amqp/converter/AmqpCoreConverter.java       |  37 +--
 .../proton/ProtonServerReceiverContext.java     |  16 +-
 .../amqp/proton/ProtonServerSenderContext.java  |  54 ++---
 .../JMSMappingOutboundTransformerTest.java      |   4 +-
 .../artemis/core/protocol/mqtt/MQTTSession.java |   8 +-
 .../core/protocol/mqtt/MQTTSessionCallback.java |   2 +-
 .../artemis/core/protocol/mqtt/MQTTUtil.java    |  19 +-
 .../protocol/openwire/OpenWireConnection.java   |   2 +-
 .../openwire/OpenWireMessageConverter.java      |   8 +-
 .../core/protocol/openwire/OpenwireMessage.java |  11 +
 .../core/protocol/openwire/amq/AMQConsumer.java |   2 +-
 .../core/protocol/openwire/amq/AMQSession.java  |  11 +-
 .../core/protocol/stomp/StompSession.java       |   2 +-
 .../ra/inflow/ActiveMQMessageHandler.java       |   2 +-
 .../core/protocol/ServerPacketDecoder.java      |  25 +-
 .../protocol/core/impl/CoreSessionCallback.java |  11 +-
 .../core/server/impl/ServerConsumerImpl.java    |   2 +-
 .../spi/core/protocol/MessageConverter.java     |   3 +-
 .../spi/core/protocol/SessionCallback.java      |   2 +-
 .../impl/ScheduledDeliveryHandlerTest.java      |  11 +
 .../integration/client/AcknowledgeTest.java     |  11 +
 .../integration/client/HangConsumerTest.java    |   2 +-
 .../persistence/XmlImportExportTest.java        |   2 +-
 48 files changed, 1048 insertions(+), 643 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
index e24e245..dbf7468 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
@@ -21,8 +21,9 @@ import java.util.ArrayList;
 import java.util.List;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.util.internal.PlatformDependent;
-import org.apache.activemq.artemis.utils.AbstractInterner;
+import org.apache.activemq.artemis.utils.AbstractByteBufPool;
+import org.apache.activemq.artemis.utils.AbstractPool;
+import org.apache.activemq.artemis.utils.ByteUtil;
 import org.apache.activemq.artemis.utils.DataConstants;
 
 /**
@@ -33,129 +34,6 @@ import org.apache.activemq.artemis.utils.DataConstants;
  */
 public final class SimpleString implements CharSequence, Serializable, Comparable<SimpleString> {
 
-   public static final class Interner extends AbstractInterner<SimpleString> {
-
-      private final int maxLength;
-
-      public Interner(final int capacity, final int maxCharsLength) {
-         super(capacity);
-         this.maxLength = maxCharsLength;
-      }
-
-      @Override
-      protected boolean isEqual(final SimpleString entry, final ByteBuf byteBuf, final int offset, final int length) {
-         return SimpleString.isEqual(entry, byteBuf, offset, length);
-      }
-
-      @Override
-      protected boolean canIntern(final ByteBuf byteBuf, final int length) {
-         assert length % 2 == 0 : "length must be a multiple of 2";
-         final int expectedStringLength = length >> 1;
-         return expectedStringLength <= maxLength;
-      }
-
-      @Override
-      protected SimpleString create(final ByteBuf byteBuf, final int length) {
-         return readSimpleString(byteBuf, length);
-      }
-   }
-
-   /**
-    * Returns {@code true} if  the {@link SimpleString} encoded content into {@code bytes} is equals to {@code s},
-    * {@code false} otherwise.
-    * <p>
-    * It assumes that the {@code bytes} content is read using {@link SimpleString#readSimpleString(ByteBuf, int)} ie starting right after the
-    * length field.
-    */
-   public static boolean isEqual(final SimpleString s, final ByteBuf bytes, final int offset, final int length) {
-      if (s == null) {
-         return false;
-      }
-      final byte[] chars = s.getData();
-      if (chars.length != length)
-         return false;
-      if (PlatformDependent.isUnaligned() && PlatformDependent.hasUnsafe()) {
-         if ((offset + length) > bytes.writerIndex()) {
-            throw new IndexOutOfBoundsException();
-         }
-         if (bytes.hasArray()) {
-            return batchOnHeapIsEqual(chars, bytes.array(), bytes.arrayOffset() + offset, length);
-         } else if (bytes.hasMemoryAddress()) {
-            return batchOffHeapIsEqual(chars, bytes.memoryAddress(), offset, length);
-         }
-      }
-      return byteBufIsEqual(chars, bytes, offset, length);
-   }
-
-   private static boolean byteBufIsEqual(final byte[] chars, final ByteBuf bytes, final int offset, final int length) {
-      for (int i = 0; i < length; i++)
-         if (chars[i] != bytes.getByte(offset + i))
-            return false;
-      return true;
-   }
-
-   private static boolean batchOnHeapIsEqual(final byte[] chars,
-                                             final byte[] array,
-                                             final int arrayOffset,
-                                             final int length) {
-      final int longCount = length >>> 3;
-      final int bytesCount = length & 7;
-      int bytesIndex = arrayOffset;
-      int charsIndex = 0;
-      for (int i = 0; i < longCount; i++) {
-         final long charsLong = PlatformDependent.getLong(chars, charsIndex);
-         final long bytesLong = PlatformDependent.getLong(array, bytesIndex);
-         if (charsLong != bytesLong) {
-            return false;
-
-         }
-         bytesIndex += 8;
-         charsIndex += 8;
-      }
-      for (int i = 0; i < bytesCount; i++) {
-         final byte charsByte = PlatformDependent.getByte(chars, charsIndex);
-         final byte bytesByte = PlatformDependent.getByte(array, bytesIndex);
-         if (charsByte != bytesByte) {
-            return false;
-
-         }
-         bytesIndex++;
-         charsIndex++;
-      }
-      return true;
-   }
-
-   private static boolean batchOffHeapIsEqual(final byte[] chars,
-                                              final long address,
-                                              final int offset,
-                                              final int length) {
-      final int longCount = length >>> 3;
-      final int bytesCount = length & 7;
-      long bytesAddress = address + offset;
-      int charsIndex = 0;
-      for (int i = 0; i < longCount; i++) {
-         final long charsLong = PlatformDependent.getLong(chars, charsIndex);
-         final long bytesLong = PlatformDependent.getLong(bytesAddress);
-         if (charsLong != bytesLong) {
-            return false;
-
-         }
-         bytesAddress += 8;
-         charsIndex += 8;
-      }
-      for (int i = 0; i < bytesCount; i++) {
-         final byte charsByte = PlatformDependent.getByte(chars, charsIndex);
-         final byte bytesByte = PlatformDependent.getByte(bytesAddress);
-         if (charsByte != bytesByte) {
-            return false;
-
-         }
-         bytesAddress++;
-         charsIndex++;
-      }
-      return true;
-   }
-
    private static final long serialVersionUID = 4204223851422244307L;
 
    // Attributes
@@ -185,6 +63,13 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl
       return new SimpleString(string);
    }
 
+   public static SimpleString toSimpleString(final String string, StringSimpleStringPool pool) {
+      if (pool == null) {
+         return toSimpleString(string);
+      }
+      return pool.getOrCreate(string);
+   }
+
    // Constructors
    // ----------------------------------------------------------------------
 
@@ -236,6 +121,10 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl
       data[1] = high;
    }
 
+   public boolean isEmpty() {
+      return data.length == 0;
+   }
+
    // CharSequence implementation
    // ---------------------------------------------------------------------------
 
@@ -267,11 +156,26 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl
       return readSimpleString(buffer);
    }
 
+   public static SimpleString readNullableSimpleString(ByteBuf buffer, ByteBufSimpleStringPool pool) {
+      int b = buffer.readByte();
+      if (b == DataConstants.NULL) {
+         return null;
+      }
+      return readSimpleString(buffer, pool);
+   }
+
    public static SimpleString readSimpleString(ByteBuf buffer) {
       int len = buffer.readInt();
       return readSimpleString(buffer, len);
    }
 
+   public static SimpleString readSimpleString(ByteBuf buffer, ByteBufSimpleStringPool pool) {
+      if (pool == null) {
+         return readSimpleString(buffer);
+      }
+      return pool.getOrCreate(buffer);
+   }
+
    public static SimpleString readSimpleString(final ByteBuf buffer, final int length) {
       byte[] data = new byte[length];
       buffer.readBytes(data);
@@ -381,22 +285,23 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl
       if (other instanceof SimpleString) {
          SimpleString s = (SimpleString) other;
 
-         if (data.length != s.data.length) {
-            return false;
-         }
-
-         for (int i = 0; i < data.length; i++) {
-            if (data[i] != s.data[i]) {
-               return false;
-            }
-         }
-
-         return true;
+         return ByteUtil.equals(data, s.data);
       } else {
          return false;
       }
    }
 
+   /**
+    * Returns {@code true} if  the {@link SimpleString} encoded content into {@code bytes} is equals to {@code s},
+    * {@code false} otherwise.
+    * <p>
+    * It assumes that the {@code bytes} content is read using {@link SimpleString#readSimpleString(ByteBuf, int)} ie starting right after the
+    * length field.
+    */
+   public boolean equals(final ByteBuf byteBuf, final int offset, final int length) {
+      return ByteUtil.equals(data, byteBuf, offset, length);
+   }
+
    @Override
    public int hashCode() {
       if (hash == 0) {
@@ -575,4 +480,64 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl
          dst[d++] = (char) (low | high);
       }
    }
+
+   public static final class ByteBufSimpleStringPool extends AbstractByteBufPool<SimpleString> {
+
+      private static final int UUID_LENGTH = 36;
+
+      private final int maxLength;
+
+      public ByteBufSimpleStringPool() {
+         this.maxLength = UUID_LENGTH;
+      }
+
+      public ByteBufSimpleStringPool(final int capacity, final int maxCharsLength) {
+         super(capacity);
+         this.maxLength = maxCharsLength;
+      }
+
+      @Override
+      protected boolean isEqual(final SimpleString entry, final ByteBuf byteBuf, final int offset, final int length) {
+         if (entry == null) {
+            return false;
+         }
+         return entry.equals(byteBuf, offset, length);
+      }
+
+      @Override
+      protected boolean canPool(final ByteBuf byteBuf, final int length) {
+         assert length % 2 == 0 : "length must be a multiple of 2";
+         final int expectedStringLength = length >> 1;
+         return expectedStringLength <= maxLength;
+      }
+
+      @Override
+      protected SimpleString create(final ByteBuf byteBuf, final int length) {
+         return readSimpleString(byteBuf, length);
+      }
+   }
+
+   public static final class StringSimpleStringPool extends AbstractPool<String, SimpleString> {
+
+      public StringSimpleStringPool() {
+         super();
+      }
+
+      public StringSimpleStringPool(final int capacity) {
+         super(capacity);
+      }
+
+      @Override
+      protected SimpleString create(String value) {
+         return toSimpleString(value);
+      }
+
+      @Override
+      protected boolean isEqual(SimpleString entry, String value) {
+         if (entry == null) {
+            return false;
+         }
+         return entry.toString().equals(value);
+      }
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractByteBufPool.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractByteBufPool.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractByteBufPool.java
new file mode 100644
index 0000000..87c1b6f
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractByteBufPool.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.internal.MathUtil;
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * Thread-safe {@code <T>} interner.
+ * <p>
+ * Differently from {@link String#intern()} it contains a fixed amount of entries and
+ * when used by concurrent threads it doesn't ensure the uniqueness of the entries ie
+ * the same entry could be allocated multiple times by concurrent calls.
+ */
+public abstract class AbstractByteBufPool<T> {
+
+   public static final int DEFAULT_POOL_CAPACITY = 32;
+
+   private final T[] entries;
+   private final int mask;
+   private final int shift;
+
+   public AbstractByteBufPool() {
+      this(DEFAULT_POOL_CAPACITY);
+   }
+
+   public AbstractByteBufPool(final int capacity) {
+      entries = (T[]) new Object[MathUtil.findNextPositivePowerOfTwo(capacity)];
+      mask = entries.length - 1;
+      //log2 of entries.length
+      shift = 31 - Integer.numberOfLeadingZeros(entries.length);
+   }
+
+   /**
+    * Batch hash code implementation that works at its best if {@code bytes}
+    * contains a {@link org.apache.activemq.artemis.api.core.SimpleString} encoded.
+    */
+   private static int hashCode(final ByteBuf bytes, final int offset, final int length) {
+      if (PlatformDependent.isUnaligned() && PlatformDependent.hasUnsafe()) {
+         //if the platform allows it, the hash code could be computed without bounds checking
+         if (bytes.hasArray()) {
+            return onHeapHashCode(bytes.array(), bytes.arrayOffset() + offset, length);
+         } else if (bytes.hasMemoryAddress()) {
+            return offHeapHashCode(bytes.memoryAddress(), offset, length);
+         }
+      }
+      return byteBufHashCode(bytes, offset, length);
+   }
+
+   private static int onHeapHashCode(final byte[] bytes, final int offset, final int length) {
+      final int intCount = length >>> 1;
+      final int byteCount = length & 1;
+      int hashCode = 1;
+      int arrayIndex = offset;
+      for (int i = 0; i < intCount; i++) {
+         hashCode = 31 * hashCode + PlatformDependent.getShort(bytes, arrayIndex);
+         arrayIndex += 2;
+      }
+      for (int i = 0; i < byteCount; i++) {
+         hashCode = 31 * hashCode + PlatformDependent.getByte(bytes, arrayIndex++);
+      }
+      return hashCode;
+   }
+
+   private static int offHeapHashCode(final long address, final int offset, final int length) {
+      final int intCount = length >>> 1;
+      final int byteCount = length & 1;
+      int hashCode = 1;
+      int arrayIndex = offset;
+      for (int i = 0; i < intCount; i++) {
+         hashCode = 31 * hashCode + PlatformDependent.getShort(address + arrayIndex);
+         arrayIndex += 2;
+      }
+      for (int i = 0; i < byteCount; i++) {
+         hashCode = 31 * hashCode + PlatformDependent.getByte(address + arrayIndex++);
+      }
+      return hashCode;
+   }
+
+   private static int byteBufHashCode(final ByteBuf byteBuf, final int offset, final int length) {
+      final int intCount = length >>> 1;
+      final int byteCount = length & 1;
+      int hashCode = 1;
+      int arrayIndex = offset;
+      for (int i = 0; i < intCount; i++) {
+         final short shortLE = byteBuf.getShortLE(arrayIndex);
+         final short nativeShort = PlatformDependent.BIG_ENDIAN_NATIVE_ORDER ? Short.reverseBytes(shortLE) : shortLE;
+         hashCode = 31 * hashCode + nativeShort;
+         arrayIndex += 2;
+      }
+      for (int i = 0; i < byteCount; i++) {
+         hashCode = 31 * hashCode + byteBuf.getByte(arrayIndex++);
+      }
+      return hashCode;
+   }
+
+   /**
+    * Returns {@code true} if {@code length}'s {@code byteBuf} content from {@link ByteBuf#readerIndex()} can be pooled,
+    * {@code false} otherwise.
+    */
+   protected abstract boolean canPool(ByteBuf byteBuf, int length);
+
+   /**
+    * Create a new entry.
+    */
+   protected abstract T create(ByteBuf byteBuf, int length);
+
+   /**
+    * Returns {@code true} if the {@code entry} content is the same of {@code byteBuf} at the specified {@code offset}
+    * and {@code length} {@code false} otherwise.
+    */
+   protected abstract boolean isEqual(T entry, ByteBuf byteBuf, int offset, int length);
+
+   /**
+    * Returns a pooled entry if possible, a new one otherwise.
+    * <p>
+    * The {@code byteBuf}'s {@link ByteBuf#readerIndex()} is incremented by {@code length} after it.
+    */
+   public final T getOrCreate(final ByteBuf byteBuf) {
+      final int length = byteBuf.readInt();
+      if (!canPool(byteBuf, length)) {
+         return create(byteBuf, length);
+      } else {
+         if (!byteBuf.isReadable(length)) {
+            throw new IndexOutOfBoundsException();
+         }
+         final int bytesOffset = byteBuf.readerIndex();
+         final int hashCode = hashCode(byteBuf, bytesOffset, length);
+         //fast % operation with power of 2 entries.length
+         final int firstIndex = hashCode & mask;
+         final T firstEntry = entries[firstIndex];
+         if (isEqual(firstEntry, byteBuf, bytesOffset, length)) {
+            byteBuf.skipBytes(length);
+            return firstEntry;
+         }
+         final int secondIndex = (hashCode >> shift) & mask;
+         final T secondEntry = entries[secondIndex];
+         if (isEqual(secondEntry, byteBuf, bytesOffset, length)) {
+            byteBuf.skipBytes(length);
+            return secondEntry;
+         }
+         final T internedEntry = create(byteBuf, length);
+         final int entryIndex = firstEntry == null ? firstIndex : secondIndex;
+         entries[entryIndex] = internedEntry;
+         return internedEntry;
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractInterner.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractInterner.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractInterner.java
deleted file mode 100644
index 7e1fe40..0000000
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractInterner.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.artemis.utils;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.util.internal.MathUtil;
-import io.netty.util.internal.PlatformDependent;
-
-/**
- * Thread-safe {@code <T>} interner.
- * <p>
- * Differently from {@link String#intern()} it contains a fixed amount of entries and
- * when used by concurrent threads it doesn't ensure the uniqueness of the entries ie
- * the same entry could be allocated multiple times by concurrent calls.
- */
-public abstract class AbstractInterner<T> {
-
-   private final T[] entries;
-   private final int mask;
-   private final int shift;
-
-   public AbstractInterner(final int capacity) {
-      entries = (T[]) new Object[MathUtil.findNextPositivePowerOfTwo(capacity)];
-      mask = entries.length - 1;
-      //log2 of entries.length
-      shift = 31 - Integer.numberOfLeadingZeros(entries.length);
-   }
-
-   /**
-    * Batch hash code implementation that works at its best if {@code bytes}
-    * contains a {@link org.apache.activemq.artemis.api.core.SimpleString} encoded.
-    */
-   private static int hashCode(final ByteBuf bytes, final int offset, final int length) {
-      if (PlatformDependent.isUnaligned() && PlatformDependent.hasUnsafe()) {
-         //if the platform allows it, the hash code could be computed without bounds checking
-         if (bytes.hasArray()) {
-            return onHeapHashCode(bytes.array(), bytes.arrayOffset() + offset, length);
-         } else if (bytes.hasMemoryAddress()) {
-            return offHeapHashCode(bytes.memoryAddress(), offset, length);
-         }
-      }
-      return byteBufHashCode(bytes, offset, length);
-   }
-
-   private static int onHeapHashCode(final byte[] bytes, final int offset, final int length) {
-      final int intCount = length >>> 1;
-      final int byteCount = length & 1;
-      int hashCode = 1;
-      int arrayIndex = offset;
-      for (int i = 0; i < intCount; i++) {
-         hashCode = 31 * hashCode + PlatformDependent.getShort(bytes, arrayIndex);
-         arrayIndex += 2;
-      }
-      for (int i = 0; i < byteCount; i++) {
-         hashCode = 31 * hashCode + PlatformDependent.getByte(bytes, arrayIndex++);
-      }
-      return hashCode;
-   }
-
-   private static int offHeapHashCode(final long address, final int offset, final int length) {
-      final int intCount = length >>> 1;
-      final int byteCount = length & 1;
-      int hashCode = 1;
-      int arrayIndex = offset;
-      for (int i = 0; i < intCount; i++) {
-         hashCode = 31 * hashCode + PlatformDependent.getShort(address + arrayIndex);
-         arrayIndex += 2;
-      }
-      for (int i = 0; i < byteCount; i++) {
-         hashCode = 31 * hashCode + PlatformDependent.getByte(address + arrayIndex++);
-      }
-      return hashCode;
-   }
-
-   private static int byteBufHashCode(final ByteBuf byteBuf, final int offset, final int length) {
-      final int intCount = length >>> 1;
-      final int byteCount = length & 1;
-      int hashCode = 1;
-      int arrayIndex = offset;
-      for (int i = 0; i < intCount; i++) {
-         final short shortLE = byteBuf.getShortLE(arrayIndex);
-         final short nativeShort = PlatformDependent.BIG_ENDIAN_NATIVE_ORDER ? Short.reverseBytes(shortLE) : shortLE;
-         hashCode = 31 * hashCode + nativeShort;
-         arrayIndex += 2;
-      }
-      for (int i = 0; i < byteCount; i++) {
-         hashCode = 31 * hashCode + byteBuf.getByte(arrayIndex++);
-      }
-      return hashCode;
-   }
-
-   /**
-    * Returns {@code true} if {@code length}'s {@code byteBuf} content from {@link ByteBuf#readerIndex()} can be interned,
-    * {@code false} otherwise.
-    */
-   protected abstract boolean canIntern(ByteBuf byteBuf, int length);
-
-   /**
-    * Create a new entry.
-    */
-   protected abstract T create(ByteBuf byteBuf, int length);
-
-   /**
-    * Returns {@code true} if the {@code entry} content is the same of {@code byteBuf} at the specified {@code offset}
-    * and {@code length} {@code false} otherwise.
-    */
-   protected abstract boolean isEqual(T entry, ByteBuf byteBuf, int offset, int length);
-
-   /**
-    * Returns and interned entry if possible, a new one otherwise.
-    * <p>
-    * The {@code byteBuf}'s {@link ByteBuf#readerIndex()} is incremented by {@code length} after it.
-    */
-   public final T intern(final ByteBuf byteBuf, final int length) {
-      if (!canIntern(byteBuf, length)) {
-         return create(byteBuf, length);
-      } else {
-         if (!byteBuf.isReadable(length)) {
-            throw new IndexOutOfBoundsException();
-         }
-         final int bytesOffset = byteBuf.readerIndex();
-         final int hashCode = hashCode(byteBuf, bytesOffset, length);
-         //fast % operation with power of 2 entries.length
-         final int firstIndex = hashCode & mask;
-         final T firstEntry = entries[firstIndex];
-         if (isEqual(firstEntry, byteBuf, bytesOffset, length)) {
-            byteBuf.skipBytes(length);
-            return firstEntry;
-         }
-         final int secondIndex = (hashCode >> shift) & mask;
-         final T secondEntry = entries[secondIndex];
-         if (isEqual(secondEntry, byteBuf, bytesOffset, length)) {
-            byteBuf.skipBytes(length);
-            return secondEntry;
-         }
-         final T internedEntry = create(byteBuf, length);
-         final int entryIndex = firstEntry == null ? firstIndex : secondIndex;
-         entries[entryIndex] = internedEntry;
-         return internedEntry;
-      }
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractPool.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractPool.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractPool.java
new file mode 100644
index 0000000..cc42e8f
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractPool.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.internal.MathUtil;
+
+/**
+ * Thread-safe {@code <T>} interner.
+ * <p>
+ * Differently from {@link String#intern()} it contains a fixed amount of entries and
+ * when used by concurrent threads it doesn't ensure the uniqueness of the entries ie
+ * the same entry could be allocated multiple times by concurrent calls.
+ */
+public abstract class AbstractPool<I, O> {
+
+   public static final int DEFAULT_POOL_CAPACITY = 32;
+
+   private final O[] entries;
+   private final int mask;
+   private final int shift;
+
+   public AbstractPool() {
+      this(DEFAULT_POOL_CAPACITY);
+   }
+
+   public AbstractPool(final int capacity) {
+      entries = (O[]) new Object[MathUtil.findNextPositivePowerOfTwo(capacity)];
+      mask = entries.length - 1;
+      //log2 of entries.length
+      shift = 31 - Integer.numberOfLeadingZeros(entries.length);
+   }
+
+   /**
+    * Create a new entry.
+    */
+   protected abstract O create(I value);
+
+   /**
+    * Returns {@code true} if the {@code entry} content is equal to {@code value};
+    */
+   protected abstract boolean isEqual(O entry, I value);
+
+   protected int hashCode(I value) {
+      return value.hashCode();
+   }
+
+   /**
+    * Returns and interned entry if possible, a new one otherwise.
+    * <p>
+    * The {@code byteBuf}'s {@link ByteBuf#readerIndex()} is incremented by {@code length} after it.
+    */
+   public final O getOrCreate(final I value) {
+      if (value == null) {
+         return null;
+      }
+      final int hashCode = hashCode(value);
+      //fast % operation with power of 2 entries.length
+      final int firstIndex = hashCode & mask;
+      final O firstEntry = entries[firstIndex];
+      if (isEqual(firstEntry, value)) {
+         return firstEntry;
+      }
+      final int secondIndex = (hashCode >> shift) & mask;
+      final O secondEntry = entries[secondIndex];
+      if (isEqual(secondEntry, value)) {
+         return secondEntry;
+      }
+      final O internedEntry = create(value);
+      final int entryIndex = firstEntry == null ? firstIndex : secondIndex;
+      entries[entryIndex] = internedEntry;
+      return internedEntry;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
index e70891d..8835797 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
@@ -22,6 +22,7 @@ import java.util.regex.Pattern;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.util.internal.PlatformDependent;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.logs.ActiveMQUtilBundle;
@@ -207,4 +208,125 @@ public class ByteUtil {
          throw ActiveMQUtilBundle.BUNDLE.failedToParseLong(text);
       }
    }
+
+   public static boolean equals(final byte[] left, final byte[] right) {
+      return equals(left, right, 0, right.length);
+   }
+
+   public static boolean equals(final byte[] left,
+                                final byte[] right,
+                                final int rightOffset,
+                                final int rightLength) {
+      if (left == right)
+         return true;
+      if (left == null || right == null)
+         return false;
+      if (left.length != rightLength)
+         return false;
+      if (PlatformDependent.isUnaligned() && PlatformDependent.hasUnsafe()) {
+         return equalsUnsafe(left, right, rightOffset, rightLength);
+      } else {
+         return equalsSafe(left, right, rightOffset, rightLength);
+      }
+   }
+
+   private static boolean equalsSafe(byte[] left, byte[] right, int rightOffset, int rightLength) {
+      for (int i = 0; i < rightLength; i++)
+         if (left[i] != right[rightOffset + i])
+            return false;
+      return true;
+   }
+
+   private static boolean equalsUnsafe(final byte[] left,
+                                       final byte[] right,
+                                       final int rightOffset,
+                                       final int rightLength) {
+      final int longCount = rightLength >>> 3;
+      final int bytesCount = rightLength & 7;
+      int bytesIndex = rightOffset;
+      int charsIndex = 0;
+      for (int i = 0; i < longCount; i++) {
+         final long charsLong = PlatformDependent.getLong(left, charsIndex);
+         final long bytesLong = PlatformDependent.getLong(right, bytesIndex);
+         if (charsLong != bytesLong) {
+            return false;
+         }
+         bytesIndex += 8;
+         charsIndex += 8;
+      }
+      for (int i = 0; i < bytesCount; i++) {
+         final byte charsByte = PlatformDependent.getByte(left, charsIndex);
+         final byte bytesByte = PlatformDependent.getByte(right, bytesIndex);
+         if (charsByte != bytesByte) {
+            return false;
+         }
+         bytesIndex++;
+         charsIndex++;
+      }
+      return true;
+   }
+
+
+   /**
+    * Returns {@code true} if  the {@link SimpleString} encoded content into {@code bytes} is equals to {@code s},
+    * {@code false} otherwise.
+    * <p>
+    * It assumes that the {@code bytes} content is read using {@link SimpleString#readSimpleString(ByteBuf, int)} ie starting right after the
+    * length field.
+    */
+   public static boolean equals(final byte[] bytes, final ByteBuf byteBuf, final int offset, final int length) {
+      if (bytes.length != length)
+         return false;
+      if (PlatformDependent.isUnaligned() && PlatformDependent.hasUnsafe()) {
+         if ((offset + length) > byteBuf.writerIndex()) {
+            throw new IndexOutOfBoundsException();
+         }
+         if (byteBuf.hasArray()) {
+            return equals(bytes, byteBuf.array(), byteBuf.arrayOffset() + offset, length);
+         } else if (byteBuf.hasMemoryAddress()) {
+            return equalsOffHeap(bytes, byteBuf.memoryAddress(), offset, length);
+         }
+      }
+      return equalsOnHeap(bytes, byteBuf, offset, length);
+   }
+
+   private static boolean equalsOnHeap(final byte[] bytes, final ByteBuf byteBuf, final int offset, final int length) {
+      if (bytes.length != length)
+         return false;
+      for (int i = 0; i < length; i++)
+         if (bytes[i] != byteBuf.getByte(offset + i))
+            return false;
+      return true;
+   }
+
+   private static boolean equalsOffHeap(final byte[] bytes,
+                                        final long address,
+                                        final int offset,
+                                        final int length) {
+      final int longCount = length >>> 3;
+      final int bytesCount = length & 7;
+      long bytesAddress = address + offset;
+      int charsIndex = 0;
+      for (int i = 0; i < longCount; i++) {
+         final long charsLong = PlatformDependent.getLong(bytes, charsIndex);
+         final long bytesLong = PlatformDependent.getLong(bytesAddress);
+         if (charsLong != bytesLong) {
+            return false;
+
+         }
+         bytesAddress += 8;
+         charsIndex += 8;
+      }
+      for (int i = 0; i < bytesCount; i++) {
+         final byte charsByte = PlatformDependent.getByte(bytes, charsIndex);
+         final byte bytesByte = PlatformDependent.getByte(bytesAddress);
+         if (charsByte != bytesByte) {
+            return false;
+
+         }
+         bytesAddress++;
+         charsIndex++;
+      }
+      return true;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
index a3e4876..56beb76 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
@@ -28,7 +28,7 @@ import io.netty.buffer.ByteBuf;
 import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.logs.ActiveMQUtilBundle;
-import org.apache.activemq.artemis.utils.AbstractInterner;
+import org.apache.activemq.artemis.utils.AbstractByteBufPool;
 import org.apache.activemq.artemis.utils.ByteUtil;
 import org.apache.activemq.artemis.utils.DataConstants;
 
@@ -332,8 +332,7 @@ public class TypedProperties {
    }
 
    public synchronized void decode(final ByteBuf buffer,
-                                   final SimpleString.Interner keyInterner,
-                                   final StringValue.Interner valueInterner) {
+                                   final TypedPropertiesDecoderPools keyValuePools) {
       byte b = buffer.readByte();
 
       if (b == DataConstants.NULL) {
@@ -346,15 +345,7 @@ public class TypedProperties {
          size = 0;
 
          for (int i = 0; i < numHeaders; i++) {
-            final SimpleString key;
-            int len = buffer.readInt();
-            if (keyInterner != null) {
-               key = keyInterner.intern(buffer, len);
-            } else {
-               byte[] data = new byte[len];
-               buffer.readBytes(data);
-               key = new SimpleString(data);
-            }
+            final SimpleString key = SimpleString.readSimpleString(buffer, keyValuePools == null ? null : keyValuePools.getPropertyKeysPool());
 
             byte type = buffer.readByte();
 
@@ -412,12 +403,7 @@ public class TypedProperties {
                   break;
                }
                case STRING: {
-                  if (valueInterner != null) {
-                     final int length = buffer.readInt();
-                     val = valueInterner.intern(buffer, length);
-                  } else {
-                     val = new StringValue(buffer);
-                  }
+                  val = StringValue.readStringValue(buffer, keyValuePools == null ? null : keyValuePools.getPropertyValuesPool());
                   doPutValue(key, val);
                   break;
                }
@@ -430,7 +416,7 @@ public class TypedProperties {
    }
 
    public synchronized void decode(final ByteBuf buffer) {
-      decode(buffer, null, null);
+      decode(buffer, null);
    }
 
    public synchronized void encode(final ByteBuf buffer) {
@@ -901,25 +887,61 @@ public class TypedProperties {
 
    public static final class StringValue extends PropertyValue {
 
-      public static final class Interner extends AbstractInterner<StringValue> {
+      final SimpleString val;
+
+      private StringValue(final SimpleString val) {
+         this.val = val;
+      }
+
+      static StringValue readStringValue(final ByteBuf byteBuf, ByteBufStringValuePool pool) {
+         if (pool == null) {
+            return new StringValue(SimpleString.readSimpleString(byteBuf));
+         } else {
+            return pool.getOrCreate(byteBuf);
+         }
+      }
+
+      @Override
+      public Object getValue() {
+         return val;
+      }
+
+      @Override
+      public void write(final ByteBuf buffer) {
+         buffer.writeByte(DataConstants.STRING);
+         SimpleString.writeSimpleString(buffer, val);
+      }
+
+      @Override
+      public int encodeSize() {
+         return DataConstants.SIZE_BYTE + SimpleString.sizeofString(val);
+      }
+
+      public static final class ByteBufStringValuePool extends AbstractByteBufPool<StringValue> {
+
+         private static final int UUID_LENGTH = 36;
 
          private final int maxLength;
 
-         public Interner(final int capacity, final int maxCharsLength) {
+         public ByteBufStringValuePool() {
+            this.maxLength = UUID_LENGTH;
+         }
+
+         public ByteBufStringValuePool(final int capacity, final int maxCharsLength) {
             super(capacity);
             this.maxLength = maxCharsLength;
          }
 
          @Override
          protected boolean isEqual(final StringValue entry, final ByteBuf byteBuf, final int offset, final int length) {
-            if (entry == null) {
+            if (entry == null || entry.val == null) {
                return false;
             }
-            return SimpleString.isEqual(entry.val, byteBuf, offset, length);
+            return entry.val.equals(byteBuf, offset, length);
          }
 
          @Override
-         protected boolean canIntern(final ByteBuf byteBuf, final int length) {
+         protected boolean canPool(final ByteBuf byteBuf, final int length) {
             assert length % 2 == 0 : "length must be a multiple of 2";
             final int expectedStringLength = length >> 1;
             return expectedStringLength <= maxLength;
@@ -930,31 +952,53 @@ public class TypedProperties {
             return new StringValue(SimpleString.readSimpleString(byteBuf, length));
          }
       }
+   }
 
-      final SimpleString val;
+   public static class TypedPropertiesDecoderPools {
 
-      private StringValue(final SimpleString val) {
-         this.val = val;
+      private SimpleString.ByteBufSimpleStringPool propertyKeysPool;
+      private TypedProperties.StringValue.ByteBufStringValuePool propertyValuesPool;
+
+      public TypedPropertiesDecoderPools() {
+         this.propertyKeysPool = new SimpleString.ByteBufSimpleStringPool();
+         this.propertyValuesPool = new TypedProperties.StringValue.ByteBufStringValuePool();
       }
 
-      private StringValue(final ByteBuf buffer) {
-         val = SimpleString.readSimpleString(buffer);
+      public TypedPropertiesDecoderPools(int keyPoolCapacity, int valuePoolCapacity, int maxCharsLength) {
+         this.propertyKeysPool = new SimpleString.ByteBufSimpleStringPool(keyPoolCapacity, maxCharsLength);
+         this.propertyValuesPool = new TypedProperties.StringValue.ByteBufStringValuePool(valuePoolCapacity, maxCharsLength);
       }
 
-      @Override
-      public Object getValue() {
-         return val;
+      public SimpleString.ByteBufSimpleStringPool getPropertyKeysPool() {
+         return propertyKeysPool;
       }
 
-      @Override
-      public void write(final ByteBuf buffer) {
-         buffer.writeByte(DataConstants.STRING);
-         SimpleString.writeSimpleString(buffer, val);
+      public TypedProperties.StringValue.ByteBufStringValuePool getPropertyValuesPool() {
+         return propertyValuesPool;
       }
+   }
 
-      @Override
-      public int encodeSize() {
-         return DataConstants.SIZE_BYTE + SimpleString.sizeofString(val);
+   public static class TypedPropertiesStringSimpleStringPools {
+
+      private SimpleString.StringSimpleStringPool propertyKeysPool;
+      private SimpleString.StringSimpleStringPool propertyValuesPool;
+
+      public TypedPropertiesStringSimpleStringPools() {
+         this.propertyKeysPool = new SimpleString.StringSimpleStringPool();
+         this.propertyValuesPool = new SimpleString.StringSimpleStringPool();
+      }
+
+      public TypedPropertiesStringSimpleStringPools(int keyPoolCapacity, int valuePoolCapacity) {
+         this.propertyKeysPool = new SimpleString.StringSimpleStringPool(keyPoolCapacity);
+         this.propertyValuesPool = new SimpleString.StringSimpleStringPool(valuePoolCapacity);
+      }
+
+      public SimpleString.StringSimpleStringPool getPropertyKeysPool() {
+         return propertyKeysPool;
+      }
+
+      public SimpleString.StringSimpleStringPool getPropertyValuesPool() {
+         return propertyValuesPool;
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index ddb8a3b..d24cd95 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import java.util.Set;
 
 import io.netty.buffer.ByteBuf;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.persistence.Persister;
 
 /**
@@ -587,6 +588,8 @@ public interface Message {
 
    Message putStringProperty(SimpleString key, SimpleString value);
 
+   Message putStringProperty(SimpleString key, String value);
+
    /**
     * Returns the size of the <em>encoded</em> message.
     */
@@ -649,6 +652,9 @@ public interface Message {
    /** This should make you convert your message into Core format. */
    ICoreMessage toCore();
 
+   /** This should make you convert your message into Core format. */
+   ICoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools);
+
    int getMemoryEstimate();
 
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
index 91fb6ca..8068aa9 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
@@ -30,6 +30,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
 import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
 import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.reader.MessageUtil;
 import org.apache.activemq.artemis.utils.UUID;
 import org.apache.activemq.artemis.utils.collections.TypedProperties;
@@ -59,6 +60,10 @@ public class ClientMessageImpl extends CoreMessage implements ClientMessageInter
    public ClientMessageImpl() {
    }
 
+   public ClientMessageImpl(CoreMessageObjectPools coreMessageObjectPools) {
+      super(coreMessageObjectPools);
+   }
+
    protected ClientMessageImpl(ClientMessageImpl other) {
       super(other);
    }
@@ -96,11 +101,22 @@ public class ClientMessageImpl extends CoreMessage implements ClientMessageInter
                             final long expiration,
                             final long timestamp,
                             final byte priority,
-                            final int initialMessageBufferSize) {
+                            final int initialMessageBufferSize,
+                            final CoreMessageObjectPools coreMessageObjectPools) {
+      super(coreMessageObjectPools);
       this.setType(type).setExpiration(expiration).setTimestamp(timestamp).setDurable(durable).
            setPriority(priority).initBuffer(initialMessageBufferSize);
    }
 
+   public ClientMessageImpl(final byte type,
+                            final boolean durable,
+                            final long expiration,
+                            final long timestamp,
+                            final byte priority,
+                            final int initialMessageBufferSize) {
+      this(type, durable, expiration, timestamp, priority, initialMessageBufferSize, null);
+   }
+
    @Override
    public TypedProperties getProperties() {
       return this.checkProperties();
@@ -286,6 +302,11 @@ public class ClientMessageImpl extends CoreMessage implements ClientMessageInter
    }
 
    @Override
+   public ClientMessageImpl putStringProperty(final SimpleString key, final String value) {
+      return (ClientMessageImpl) super.putStringProperty(key, value);
+   }
+
+   @Override
    public ClientMessageImpl putObjectProperty(final SimpleString key,
                                               final Object value) throws ActiveMQPropertyConversionException {
       return (ClientMessageImpl) super.putObjectProperty(key, value);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
index 61784ad..b5f8a1b 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
@@ -43,6 +43,7 @@ import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
 import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
 import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
 import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.remoting.FailureListener;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@@ -148,6 +149,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
 
    private final Executor closeExecutor;
 
+   private final CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools();
+
    ClientSessionImpl(final ClientSessionFactoryInternal sessionFactory,
                      final String name,
                      final String username,
@@ -869,7 +872,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
                                       final long expiration,
                                       final long timestamp,
                                       final byte priority) {
-      return new ClientMessageImpl(type, durable, expiration, timestamp, priority, initialMessagePacketSize);
+      return new ClientMessageImpl(type, durable, expiration, timestamp, priority, initialMessagePacketSize, coreMessageObjectPools);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index 4ebf97e..888b785 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -93,18 +93,14 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
 
    protected volatile TypedProperties properties;
 
-   private final SimpleString.Interner keysInterner;
-   private final TypedProperties.StringValue.Interner valuesInterner;
+   private final CoreMessageObjectPools coreMessageObjectPools;
 
-   public CoreMessage(final SimpleString.Interner keysInterner,
-                      final TypedProperties.StringValue.Interner valuesInterner) {
-      this.keysInterner = keysInterner;
-      this.valuesInterner = valuesInterner;
+   public CoreMessage(final CoreMessageObjectPools coreMessageObjectPools) {
+      this.coreMessageObjectPools = coreMessageObjectPools;
    }
 
    public CoreMessage() {
-      this.keysInterner = null;
-      this.valuesInterner = null;
+      this.coreMessageObjectPools = null;
    }
 
    /** On core there's no delivery annotation */
@@ -326,10 +322,13 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
    }
 
    public CoreMessage(long id, int bufferSize) {
+      this(id, bufferSize, null);
+   }
+
+   public CoreMessage(long id, int bufferSize, CoreMessageObjectPools coreMessageObjectPools) {
       this.initBuffer(bufferSize);
       this.setMessageID(id);
-      this.keysInterner = null;
-      this.valuesInterner = null;
+      this.coreMessageObjectPools = coreMessageObjectPools;
    }
 
    protected CoreMessage(CoreMessage other, TypedProperties copyProperties) {
@@ -343,8 +342,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
       this.timestamp = other.timestamp;
       this.priority = other.priority;
       this.userID = other.userID;
-      this.keysInterner = other.keysInterner;
-      this.valuesInterner = other.valuesInterner;
+      this.coreMessageObjectPools = other.coreMessageObjectPools;
       if (copyProperties != null) {
          this.properties = new TypedProperties(copyProperties);
       }
@@ -424,7 +422,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
 
    @Override
    public CoreMessage setValidatedUserID(String validatedUserID) {
-      putStringProperty(Message.HDR_VALIDATED_USER, SimpleString.toSimpleString(validatedUserID));
+      putStringProperty(Message.HDR_VALIDATED_USER, SimpleString.toSimpleString(validatedUserID, getPropertyValuesPool()));
       return this;
    }
 
@@ -479,7 +477,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
          TypedProperties properties = new TypedProperties();
          if (buffer != null && propertiesLocation >= 0) {
             final ByteBuf byteBuf = buffer.duplicate().readerIndex(propertiesLocation);
-            properties.decode(byteBuf, keysInterner, valuesInterner);
+            properties.decode(byteBuf, coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesDecoderPools());
          }
          this.properties = properties;
       }
@@ -543,17 +541,8 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
    private void decodeHeadersAndProperties(final ByteBuf buffer, boolean lazyProperties) {
       messageIDPosition = buffer.readerIndex();
       messageID = buffer.readLong();
-      int b = buffer.readByte();
-      if (b != DataConstants.NULL) {
-         final int length = buffer.readInt();
-         if (keysInterner != null) {
-            address = keysInterner.intern(buffer, length);
-         } else {
-            address = SimpleString.readSimpleString(buffer, length);
-         }
-      } else {
-         address = null;
-      }
+
+      address = SimpleString.readNullableSimpleString(buffer, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressDecoderPool());
       if (buffer.readByte() == DataConstants.NOT_NULL) {
          byte[] bytes = new byte[16];
          buffer.readBytes(bytes);
@@ -571,7 +560,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
          propertiesLocation = buffer.readerIndex();
       } else {
          properties = new TypedProperties();
-         properties.decode(buffer, keysInterner, valuesInterner);
+         properties.decode(buffer, coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesDecoderPools());
       }
    }
 
@@ -671,7 +660,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
    @Override
    public CoreMessage setAddress(String address) {
       messageChanged();
-      this.address = SimpleString.toSimpleString(address);
+      this.address = SimpleString.toSimpleString(address, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressStringSimpleStringPool());
       return this;
    }
 
@@ -703,7 +692,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
    public CoreMessage putBooleanProperty(final String key, final boolean value) {
       messageChanged();
       checkProperties();
-      properties.putBooleanProperty(new SimpleString(key), value);
+      properties.putBooleanProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
       return this;
    }
 
@@ -724,7 +713,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
    @Override
    public Boolean getBooleanProperty(final String key) throws ActiveMQPropertyConversionException {
       checkProperties();
-      return properties.getBooleanProperty(new SimpleString(key));
+      return properties.getBooleanProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()));
    }
 
    @Override
@@ -739,7 +728,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
    public CoreMessage putByteProperty(final String key, final byte value) {
       messageChanged();
       checkProperties();
-      properties.putByteProperty(new SimpleString(key), value);
+      properties.putByteProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
 
       return this;
    }
@@ -752,7 +741,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
 
    @Override
    public Byte getByteProperty(final String key) throws ActiveMQPropertyConversionException {
-      return getByteProperty(SimpleString.toSimpleString(key));
+      return getByteProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()));
    }
 
    @Override
@@ -768,7 +757,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
    public CoreMessage putBytesProperty(final String key, final byte[] value) {
       messageChanged();
       checkProperties();
-      properties.putBytesProperty(new SimpleString(key), value);
+      properties.putBytesProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
       return this;
    }
 
@@ -780,7 +769,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
 
    @Override
    public byte[] getBytesProperty(final String key) throws ActiveMQPropertyConversionException {
-      return getBytesProperty(new SimpleString(key));
+      return getBytesProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()));
    }
 
    @Override
@@ -795,7 +784,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
    public CoreMessage putCharProperty(String key, char value) {
       messageChanged();
       checkProperties();
-      properties.putCharProperty(new SimpleString(key), value);
+      properties.putCharProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
       return this;
    }
 
@@ -811,7 +800,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
    public CoreMessage putShortProperty(final String key, final short value) {
       messageChanged();
       checkProperties();
-      properties.putShortProperty(new SimpleString(key), value);
+      properties.putShortProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
       return this;
    }
 
@@ -827,7 +816,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
    public CoreMessage putIntProperty(final String key, final int value) {
       messageChanged();
       checkProperties();
-      properties.putIntProperty(new SimpleString(key), value);
+      properties.putIntProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
       return this;
    }
 
@@ -854,7 +843,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
    public CoreMessage putLongProperty(final String key, final long value) {
       messageChanged();
       checkProperties();
-      properties.putLongProperty(new SimpleString(key), value);
+      properties.putLongProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
       return this;
    }
 
@@ -882,7 +871,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
    public CoreMessage putFloatProperty(final String key, final float value) {
       messageChanged();
       checkProperties();
-      properties.putFloatProperty(new SimpleString(key), value);
+      properties.putFloatProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
       return this;
    }
 
@@ -898,7 +887,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
    public CoreMessage putDoubleProperty(final String key, final double value) {
       messageChanged();
       checkProperties();
-      properties.putDoubleProperty(new SimpleString(key), value);
+      properties.putDoubleProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
       return this;
    }
 
@@ -924,10 +913,19 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
    }
 
    @Override
+   public CoreMessage putStringProperty(final SimpleString key, final String value) {
+      messageChanged();
+      checkProperties();
+      properties.putSimpleStringProperty(key, SimpleString.toSimpleString(value, getPropertyValuesPool()));
+      return this;
+   }
+
+
+   @Override
    public CoreMessage putStringProperty(final String key, final String value) {
       messageChanged();
       checkProperties();
-      properties.putSimpleStringProperty(new SimpleString(key), SimpleString.toSimpleString(value));
+      properties.putSimpleStringProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), SimpleString.toSimpleString(value, getPropertyValuesPool()));
       return this;
    }
 
@@ -943,7 +941,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
    @Override
    public Object getObjectProperty(final String key) {
       checkProperties();
-      return getObjectProperty(SimpleString.toSimpleString(key));
+      return getObjectProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()));
    }
 
    @Override
@@ -955,7 +953,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
    @Override
    public CoreMessage putObjectProperty(final String key, final Object value) throws ActiveMQPropertyConversionException {
       messageChanged();
-      putObjectProperty(new SimpleString(key), value);
+      putObjectProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
       return this;
    }
 
@@ -968,7 +966,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
    @Override
    public Short getShortProperty(final String key) throws ActiveMQPropertyConversionException {
       checkProperties();
-      return properties.getShortProperty(new SimpleString(key));
+      return properties.getShortProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()));
    }
 
    @Override
@@ -980,7 +978,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
    @Override
    public Float getFloatProperty(final String key) throws ActiveMQPropertyConversionException {
       checkProperties();
-      return properties.getFloatProperty(new SimpleString(key));
+      return properties.getFloatProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()));
    }
 
    @Override
@@ -996,7 +994,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
 
    @Override
    public String getStringProperty(final String key) throws ActiveMQPropertyConversionException {
-      return getStringProperty(new SimpleString(key));
+      return getStringProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()));
    }
 
    @Override
@@ -1008,7 +1006,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
    @Override
    public SimpleString getSimpleStringProperty(final String key) throws ActiveMQPropertyConversionException {
       checkProperties();
-      return properties.getSimpleStringProperty(new SimpleString(key));
+      return properties.getSimpleStringProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()));
    }
 
    @Override
@@ -1025,7 +1023,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
    public Object removeProperty(final String key) {
       messageChanged();
       checkProperties();
-      Object oldValue = properties.removeProperty(new SimpleString(key));
+      Object oldValue = properties.removeProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()));
       if (oldValue != null) {
          messageChanged();
       }
@@ -1041,7 +1039,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
    @Override
    public boolean containsProperty(final String key) {
       checkProperties();
-      return properties.containsProperty(new SimpleString(key));
+      return properties.containsProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()));
    }
 
    @Override
@@ -1116,6 +1114,11 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
    }
 
    @Override
+   public CoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) {
+      return this;
+   }
+
+   @Override
    public String toString() {
       try {
          checkProperties();
@@ -1135,4 +1138,12 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
          return new java.util.Date(timestamp).toString();
       }
    }
+
+   private SimpleString.StringSimpleStringPool getPropertyKeysPool() {
+      return coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesStringSimpleStringPools().getPropertyKeysPool();
+   }
+
+   private SimpleString.StringSimpleStringPool getPropertyValuesPool() {
+      return coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesStringSimpleStringPools().getPropertyValuesPool();
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessageObjectPools.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessageObjectPools.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessageObjectPools.java
new file mode 100644
index 0000000..d4e3ed1
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessageObjectPools.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.message.impl;
+
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.utils.collections.TypedProperties;
+
+public class CoreMessageObjectPools {
+
+   private Supplier<SimpleString.ByteBufSimpleStringPool> addressDecoderPool = Suppliers.memoize(SimpleString.ByteBufSimpleStringPool::new);
+   private Supplier<TypedProperties.TypedPropertiesDecoderPools> propertiesDecoderPools = Suppliers.memoize(TypedProperties.TypedPropertiesDecoderPools::new);
+
+   private Supplier<SimpleString.StringSimpleStringPool> groupIdStringSimpleStringPool = Suppliers.memoize(SimpleString.StringSimpleStringPool::new);
+   private Supplier<SimpleString.StringSimpleStringPool> addressStringSimpleStringPool = Suppliers.memoize(SimpleString.StringSimpleStringPool::new);
+   private Supplier<TypedProperties.TypedPropertiesStringSimpleStringPools> propertiesStringSimpleStringPools = Suppliers.memoize(TypedProperties.TypedPropertiesStringSimpleStringPools::new);
+
+   public CoreMessageObjectPools() {
+   }
+
+   public SimpleString.ByteBufSimpleStringPool getAddressDecoderPool() {
+      return addressDecoderPool.get();
+   }
+
+   public SimpleString.StringSimpleStringPool getAddressStringSimpleStringPool() {
+      return addressStringSimpleStringPool.get();
+   }
+
+   public SimpleString.StringSimpleStringPool getGroupIdStringSimpleStringPool() {
+      return groupIdStringSimpleStringPool.get();
+   }
+
+   public TypedProperties.TypedPropertiesDecoderPools getPropertiesDecoderPools() {
+      return propertiesDecoderPools.get();
+   }
+
+   public TypedProperties.TypedPropertiesStringSimpleStringPools getPropertiesStringSimpleStringPools() {
+      return propertiesStringSimpleStringPools.get();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java
index 787e499..ad8c7a9 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.protocol;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageImpl;
 import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
 import org.apache.activemq.artemis.core.protocol.core.Packet;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketDecoder;
@@ -32,11 +33,7 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SES
 public class ClientPacketDecoder extends PacketDecoder {
 
    private static final long serialVersionUID = 6952614096979334582L;
-   public static final ClientPacketDecoder INSTANCE = new ClientPacketDecoder();
-
-   protected ClientPacketDecoder() {
-
-   }
+   protected final CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools();
 
    @Override
    public Packet decode(final ActiveMQBuffer in, CoreRemotingConnection connection) {
@@ -56,9 +53,9 @@ public class ClientPacketDecoder extends PacketDecoder {
       switch (packetType) {
          case SESS_RECEIVE_MSG: {
             if (connection.isVersionBeforeAddressChange()) {
-               packet = new SessionReceiveMessage_1X(new ClientMessageImpl());
+               packet = new SessionReceiveMessage_1X(new ClientMessageImpl(coreMessageObjectPools));
             } else {
-               packet = new SessionReceiveMessage(new ClientMessageImpl());
+               packet = new SessionReceiveMessage(new ClientMessageImpl(coreMessageObjectPools));
             }
             break;
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
index f0005ff..c58a0bd 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
@@ -511,7 +511,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager {
    }
 
    protected PacketDecoder createPacketDecoder() {
-      return ClientPacketDecoder.INSTANCE;
+      return new ClientPacketDecoder();
    }
 
    private void forceReturnChannel1(ActiveMQException cause) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java
index 2660f96..e8f5920 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java
@@ -109,12 +109,18 @@ public class MessageUtil {
       return message.getSimpleStringProperty(REPLYTO_HEADER_NAME);
    }
 
-   public static void setJMSReplyTo(Message message, final SimpleString dest) {
-
+   public static void setJMSReplyTo(Message message, final String dest) {
       if (dest == null) {
          message.removeProperty(REPLYTO_HEADER_NAME);
       } else {
+         message.putStringProperty(REPLYTO_HEADER_NAME, dest);
+      }
+   }
 
+   public static void setJMSReplyTo(Message message, final SimpleString dest) {
+      if (dest == null) {
+         message.removeProperty(REPLYTO_HEADER_NAME);
+      } else {
          message.putStringProperty(REPLYTO_HEADER_NAME, dest);
       }
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java
index ec94011..310b4ed 100644
--- a/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java
+++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java
@@ -337,7 +337,7 @@ public class CoreMessageTest {
 
    public String generate(String body) throws Exception {
 
-      ClientMessageImpl message = new ClientMessageImpl(MESSAGE_TYPE, DURABLE, EXPIRATION, TIMESTAMP, PRIORITY, 10 * 1024);
+      ClientMessageImpl message = new ClientMessageImpl(MESSAGE_TYPE, DURABLE, EXPIRATION, TIMESTAMP, PRIORITY, 10 * 1024, null);
       TextMessageUtil.writeBodyText(message.getBodyBuffer(), SimpleString.toSimpleString(body));
 
       message.setAddress(ADDRESS);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java
index 626dd4d..7750564 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java
@@ -99,26 +99,28 @@ public class ActiveMQDestination extends JNDIStorable implements Destination, Se
       }
    }
 
-   public static String createQueueNameForSubscription(final boolean isDurable,
+   public static SimpleString createQueueNameForSubscription(final boolean isDurable,
                                                        final String clientID,
                                                        final String subscriptionName) {
+      final String queueName;
       if (clientID != null) {
          if (isDurable) {
-            return ActiveMQDestination.escape(clientID) + SEPARATOR +
+            queueName = ActiveMQDestination.escape(clientID) + SEPARATOR +
                ActiveMQDestination.escape(subscriptionName);
          } else {
-            return "nonDurable" + SEPARATOR +
+            queueName = "nonDurable" + SEPARATOR +
                ActiveMQDestination.escape(clientID) + SEPARATOR +
                ActiveMQDestination.escape(subscriptionName);
          }
       } else {
          if (isDurable) {
-            return ActiveMQDestination.escape(subscriptionName);
+            queueName = ActiveMQDestination.escape(subscriptionName);
          } else {
-            return "nonDurable" + SEPARATOR +
+            queueName = "nonDurable" + SEPARATOR +
                ActiveMQDestination.escape(subscriptionName);
          }
       }
+      return SimpleString.toSimpleString(queueName);
    }
 
    public static String createQueueNameForSharedSubscription(final boolean isDurable,
@@ -192,10 +194,18 @@ public class ActiveMQDestination extends JNDIStorable implements Destination, Se
       return new ActiveMQQueue(address);
    }
 
+   public static ActiveMQQueue createQueue(final SimpleString address) {
+      return new ActiveMQQueue(address);
+   }
+
    public static ActiveMQTopic createTopic(final String address) {
       return new ActiveMQTopic(address);
    }
 
+   public static ActiveMQTopic createTopic(final SimpleString address) {
+      return new ActiveMQTopic(address);
+   }
+
    public static ActiveMQTemporaryQueue createTemporaryQueue(final String address, final ActiveMQSession session) {
       return new ActiveMQTemporaryQueue(address, session);
    }