You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by mi...@apache.org on 2018/01/17 08:33:55 UTC
[1/6] activemq-artemis git commit: ARTEMIS-1586 Added String Pools
unit tests
Repository: activemq-artemis
Updated Branches:
refs/heads/master 00bd989f9 -> 9fb8c3c47
ARTEMIS-1586 Added String Pools unit tests
- SimpleString::toSimpleString String pooling test
- ByteBufSimpleStringPool test
- StringSimpleStringPool test
- ByteBufStringValuePool test
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/a3c41818
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/a3c41818
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/a3c41818
Branch: refs/heads/master
Commit: a3c418183aa7f8e7fe92d4db551b9b15777aabb7
Parents: 98028cd
Author: Francesco Nigro <ni...@gmail.com>
Authored: Mon Jan 15 13:56:46 2018 +0100
Committer: Michael Pearce <mi...@me.com>
Committed: Wed Jan 17 09:33:41 2018 +0100
----------------------------------------------------------------------
.../artemis/utils/TypedPropertiesTest.java | 29 ++++++++++++
.../artemis/tests/util/SimpleStringTest.java | 49 ++++++++++++++++++++
2 files changed, 78 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a3c41818/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java
index 38144c9..ea044ac 100644
--- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java
+++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java
@@ -18,6 +18,8 @@ package org.apache.activemq.artemis.utils;
import java.util.Iterator;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.SimpleString;
@@ -226,4 +228,31 @@ public class TypedPropertiesTest {
props = new TypedProperties();
key = RandomUtil.randomSimpleString();
}
+
+ @Test
+ public void testByteBufStringValuePool() {
+ final int capacity = 8;
+ final int chars = Integer.toString(capacity).length();
+ final TypedProperties.StringValue.ByteBufStringValuePool pool = new TypedProperties.StringValue.ByteBufStringValuePool(capacity, chars);
+ final int bytes = new SimpleString(Integer.toString(capacity)).sizeof();
+ final ByteBuf bb = Unpooled.buffer(bytes, bytes);
+ for (int i = 0; i < capacity; i++) {
+ final SimpleString s = new SimpleString(Integer.toString(i));
+ bb.resetWriterIndex();
+ SimpleString.writeSimpleString(bb, s);
+ bb.resetReaderIndex();
+ final TypedProperties.StringValue expectedPooled = pool.getOrCreate(bb);
+ bb.resetReaderIndex();
+ Assert.assertSame(expectedPooled, pool.getOrCreate(bb));
+ }
+ }
+
+ @Test
+ public void testByteBufStringValuePoolTooLong() {
+ final SimpleString tooLong = new SimpleString("aa");
+ final ByteBuf bb = Unpooled.buffer(tooLong.sizeof(), tooLong.sizeof());
+ SimpleString.writeSimpleString(bb, tooLong);
+ final TypedProperties.StringValue.ByteBufStringValuePool pool = new TypedProperties.StringValue.ByteBufStringValuePool(1, tooLong.length() - 1);
+ Assert.assertNotSame(pool.getOrCreate(bb), pool.getOrCreate(bb.resetReaderIndex()));
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a3c41818/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/SimpleStringTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/SimpleStringTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/SimpleStringTest.java
index b367015..054b186 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/SimpleStringTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/SimpleStringTest.java
@@ -18,6 +18,8 @@ package org.apache.activemq.artemis.tests.util;
import java.util.concurrent.CountDownLatch;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.RandomUtil;
@@ -394,4 +396,51 @@ public class SimpleStringTest extends Assert {
}
}
+ @Test
+ public void testToSimpleStringPoolStringArgument() throws Exception {
+ final String s = "pooled";
+ final SimpleString ss = SimpleString.toSimpleString(s);
+ final String s1 = ss.toString();
+ Assert.assertSame("SimpleString::toSimpleString is not pooling the given String", s, s1);
+ }
+
+ @Test
+ public void testByteBufSimpleStringPool() {
+ final int capacity = 8;
+ final int chars = Integer.toString(capacity).length();
+ final SimpleString.ByteBufSimpleStringPool pool = new SimpleString.ByteBufSimpleStringPool(capacity, chars);
+ final int bytes = new SimpleString(Integer.toString(capacity)).sizeof();
+ final ByteBuf bb = Unpooled.buffer(bytes, bytes);
+ for (int i = 0; i < capacity; i++) {
+ final SimpleString s = new SimpleString(Integer.toString(i));
+ bb.resetWriterIndex();
+ SimpleString.writeSimpleString(bb, s);
+ bb.resetReaderIndex();
+ final SimpleString expectedPooled = pool.getOrCreate(bb);
+ bb.resetReaderIndex();
+ Assert.assertSame(expectedPooled, pool.getOrCreate(bb));
+ }
+ }
+
+ @Test
+ public void testByteBufSimpleStringPoolTooLong() {
+ final SimpleString tooLong = new SimpleString("aa");
+ final ByteBuf bb = Unpooled.buffer(tooLong.sizeof(), tooLong.sizeof());
+ SimpleString.writeSimpleString(bb, tooLong);
+ final SimpleString.ByteBufSimpleStringPool pool = new SimpleString.ByteBufSimpleStringPool(1, tooLong.length() - 1);
+ Assert.assertNotSame(pool.getOrCreate(bb), pool.getOrCreate(bb.resetReaderIndex()));
+ }
+
+ @Test
+ public void testStringSimpleStringPool() throws Exception {
+ final int capacity = 8;
+ final SimpleString.StringSimpleStringPool pool = new SimpleString.StringSimpleStringPool(capacity);
+ for (int i = 0; i < capacity; i++) {
+ final String s = Integer.toString(i);
+ final SimpleString expectedPooled = pool.getOrCreate(s);
+ Assert.assertSame(expectedPooled, pool.getOrCreate(s));
+ }
+ }
+
+
}
[6/6] activemq-artemis git commit: This closes #1757 ARTEMIS-1586
Reduce GC pressure due to String allocations on Core protocol
Posted by mi...@apache.org.
This closes #1757 ARTEMIS-1586 Reduce GC pressure due to String allocations on Core protocol
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/9fb8c3c4
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/9fb8c3c4
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/9fb8c3c4
Branch: refs/heads/master
Commit: 9fb8c3c470eb9a27fccf286314f5083082c4ba89
Parents: 00bd989 a3c4181
Author: Michael Pearce <mi...@me.com>
Authored: Wed Jan 17 09:33:42 2018 +0100
Committer: Michael Pearce <mi...@me.com>
Committed: Wed Jan 17 09:33:42 2018 +0100
----------------------------------------------------------------------
.../activemq/artemis/api/core/SimpleString.java | 121 ++++++++++++--
.../artemis/utils/AbstractByteBufPool.java | 164 +++++++++++++++++++
.../activemq/artemis/utils/AbstractPool.java | 89 ++++++++++
.../apache/activemq/artemis/utils/ByteUtil.java | 122 ++++++++++++++
.../utils/collections/TypedProperties.java | 110 ++++++++++++-
.../artemis/utils/TypedPropertiesTest.java | 29 ++++
.../activemq/artemis/api/core/Message.java | 6 +
.../core/client/impl/ClientMessageImpl.java | 23 ++-
.../core/client/impl/ClientSessionImpl.java | 5 +-
.../artemis/core/message/impl/CoreMessage.java | 91 ++++++----
.../message/impl/CoreMessageObjectPools.java | 55 +++++++
.../core/protocol/ClientPacketDecoder.java | 7 +-
.../impl/ActiveMQClientProtocolManager.java | 6 +-
.../activemq/artemis/reader/MessageUtil.java | 10 +-
.../artemis/message/CoreMessageTest.java | 2 +-
.../artemis/jms/client/ActiveMQDestination.java | 20 ++-
.../artemis/jms/client/ActiveMQMessage.java | 55 ++++---
.../artemis/jms/client/ActiveMQQueue.java | 8 +-
.../artemis/jms/client/ActiveMQSession.java | 6 +-
.../jms/client/ActiveMQStreamMessage.java | 2 +-
.../artemis/jms/client/ActiveMQTopic.java | 8 +-
.../protocol/amqp/broker/AMQPMessage.java | 74 ++++++---
.../amqp/broker/AMQPSessionCallback.java | 105 ++++++------
.../protocol/amqp/converter/AMQPConverter.java | 5 +-
.../amqp/converter/AMQPMessageSupport.java | 49 +++---
.../amqp/converter/AmqpCoreConverter.java | 37 +++--
.../proton/ProtonServerReceiverContext.java | 16 +-
.../amqp/proton/ProtonServerSenderContext.java | 54 +++---
.../JMSMappingOutboundTransformerTest.java | 4 +-
.../artemis/core/protocol/mqtt/MQTTSession.java | 8 +-
.../core/protocol/mqtt/MQTTSessionCallback.java | 2 +-
.../artemis/core/protocol/mqtt/MQTTUtil.java | 19 ++-
.../protocol/openwire/OpenWireConnection.java | 2 +-
.../openwire/OpenWireMessageConverter.java | 8 +-
.../core/protocol/openwire/OpenwireMessage.java | 11 ++
.../core/protocol/openwire/amq/AMQConsumer.java | 2 +-
.../core/protocol/openwire/amq/AMQSession.java | 11 +-
.../core/protocol/stomp/StompSession.java | 2 +-
.../ra/inflow/ActiveMQMessageHandler.java | 2 +-
.../core/protocol/ServerPacketDecoder.java | 8 +-
.../protocol/core/impl/CoreProtocolManager.java | 2 +-
.../protocol/core/impl/CoreSessionCallback.java | 11 +-
...ctiveMQServerSideProtocolManagerFactory.java | 4 +-
.../core/server/impl/ServerConsumerImpl.java | 2 +-
.../spi/core/protocol/MessageConverter.java | 3 +-
.../spi/core/protocol/SessionCallback.java | 2 +-
.../impl/ScheduledDeliveryHandlerTest.java | 11 ++
.../artemis/tests/util/SimpleStringTest.java | 49 ++++++
.../integration/client/AcknowledgeTest.java | 11 ++
.../integration/client/HangConsumerTest.java | 2 +-
.../persistence/XmlImportExportTest.java | 2 +-
51 files changed, 1157 insertions(+), 300 deletions(-)
----------------------------------------------------------------------
[5/6] activemq-artemis git commit: ARTEMIS-1586 Reduce GC pressure
due to String allocations on Core protocol
Posted by mi...@apache.org.
ARTEMIS-1586 Reduce GC pressure due to String allocations on Core protocol
The commit contains:
- a general purpose interner implementation
- StringValue/SimpleString internrs specializations
- TypedProperties keys/values string interning for SessionSendMessage decoding
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/8d776edd
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/8d776edd
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/8d776edd
Branch: refs/heads/master
Commit: 8d776eddfcc12bfc73771c04e376583c9fa221e1
Parents: 00bd989
Author: Francesco Nigro <ni...@gmail.com>
Authored: Thu Jan 4 15:22:05 2018 +0100
Committer: Michael Pearce <mi...@me.com>
Committed: Wed Jan 17 09:33:41 2018 +0100
----------------------------------------------------------------------
.../activemq/artemis/api/core/SimpleString.java | 138 +++++++++++++++-
.../artemis/utils/AbstractInterner.java | 157 +++++++++++++++++++
.../utils/collections/TypedProperties.java | 60 ++++++-
.../artemis/core/message/impl/CoreMessage.java | 36 ++++-
.../core/protocol/ClientPacketDecoder.java | 4 +
.../impl/ActiveMQClientProtocolManager.java | 4 +-
.../core/protocol/ServerPacketDecoder.java | 29 +++-
.../protocol/core/impl/CoreProtocolManager.java | 2 +-
...ctiveMQServerSideProtocolManagerFactory.java | 4 +-
9 files changed, 404 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8d776edd/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
index 79909c7..e24e245 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
@@ -21,6 +21,8 @@ import java.util.ArrayList;
import java.util.List;
import io.netty.buffer.ByteBuf;
+import io.netty.util.internal.PlatformDependent;
+import org.apache.activemq.artemis.utils.AbstractInterner;
import org.apache.activemq.artemis.utils.DataConstants;
/**
@@ -31,6 +33,129 @@ import org.apache.activemq.artemis.utils.DataConstants;
*/
public final class SimpleString implements CharSequence, Serializable, Comparable<SimpleString> {
+ public static final class Interner extends AbstractInterner<SimpleString> {
+
+ private final int maxLength;
+
+ public Interner(final int capacity, final int maxCharsLength) {
+ super(capacity);
+ this.maxLength = maxCharsLength;
+ }
+
+ @Override
+ protected boolean isEqual(final SimpleString entry, final ByteBuf byteBuf, final int offset, final int length) {
+ return SimpleString.isEqual(entry, byteBuf, offset, length);
+ }
+
+ @Override
+ protected boolean canIntern(final ByteBuf byteBuf, final int length) {
+ assert length % 2 == 0 : "length must be a multiple of 2";
+ final int expectedStringLength = length >> 1;
+ return expectedStringLength <= maxLength;
+ }
+
+ @Override
+ protected SimpleString create(final ByteBuf byteBuf, final int length) {
+ return readSimpleString(byteBuf, length);
+ }
+ }
+
+ /**
+ * Returns {@code true} if the {@link SimpleString} encoded content into {@code bytes} is equals to {@code s},
+ * {@code false} otherwise.
+ * <p>
+ * It assumes that the {@code bytes} content is read using {@link SimpleString#readSimpleString(ByteBuf, int)} ie starting right after the
+ * length field.
+ */
+ public static boolean isEqual(final SimpleString s, final ByteBuf bytes, final int offset, final int length) {
+ if (s == null) {
+ return false;
+ }
+ final byte[] chars = s.getData();
+ if (chars.length != length)
+ return false;
+ if (PlatformDependent.isUnaligned() && PlatformDependent.hasUnsafe()) {
+ if ((offset + length) > bytes.writerIndex()) {
+ throw new IndexOutOfBoundsException();
+ }
+ if (bytes.hasArray()) {
+ return batchOnHeapIsEqual(chars, bytes.array(), bytes.arrayOffset() + offset, length);
+ } else if (bytes.hasMemoryAddress()) {
+ return batchOffHeapIsEqual(chars, bytes.memoryAddress(), offset, length);
+ }
+ }
+ return byteBufIsEqual(chars, bytes, offset, length);
+ }
+
+ private static boolean byteBufIsEqual(final byte[] chars, final ByteBuf bytes, final int offset, final int length) {
+ for (int i = 0; i < length; i++)
+ if (chars[i] != bytes.getByte(offset + i))
+ return false;
+ return true;
+ }
+
+ private static boolean batchOnHeapIsEqual(final byte[] chars,
+ final byte[] array,
+ final int arrayOffset,
+ final int length) {
+ final int longCount = length >>> 3;
+ final int bytesCount = length & 7;
+ int bytesIndex = arrayOffset;
+ int charsIndex = 0;
+ for (int i = 0; i < longCount; i++) {
+ final long charsLong = PlatformDependent.getLong(chars, charsIndex);
+ final long bytesLong = PlatformDependent.getLong(array, bytesIndex);
+ if (charsLong != bytesLong) {
+ return false;
+
+ }
+ bytesIndex += 8;
+ charsIndex += 8;
+ }
+ for (int i = 0; i < bytesCount; i++) {
+ final byte charsByte = PlatformDependent.getByte(chars, charsIndex);
+ final byte bytesByte = PlatformDependent.getByte(array, bytesIndex);
+ if (charsByte != bytesByte) {
+ return false;
+
+ }
+ bytesIndex++;
+ charsIndex++;
+ }
+ return true;
+ }
+
+ private static boolean batchOffHeapIsEqual(final byte[] chars,
+ final long address,
+ final int offset,
+ final int length) {
+ final int longCount = length >>> 3;
+ final int bytesCount = length & 7;
+ long bytesAddress = address + offset;
+ int charsIndex = 0;
+ for (int i = 0; i < longCount; i++) {
+ final long charsLong = PlatformDependent.getLong(chars, charsIndex);
+ final long bytesLong = PlatformDependent.getLong(bytesAddress);
+ if (charsLong != bytesLong) {
+ return false;
+
+ }
+ bytesAddress += 8;
+ charsIndex += 8;
+ }
+ for (int i = 0; i < bytesCount; i++) {
+ final byte charsByte = PlatformDependent.getByte(chars, charsIndex);
+ final byte bytesByte = PlatformDependent.getByte(bytesAddress);
+ if (charsByte != bytesByte) {
+ return false;
+
+ }
+ bytesAddress++;
+ charsIndex++;
+ }
+ return true;
+ }
+
private static final long serialVersionUID = 4204223851422244307L;
// Attributes
@@ -134,7 +259,6 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl
return subSeq(start, end);
}
-
public static SimpleString readNullableSimpleString(ByteBuf buffer) {
int b = buffer.readByte();
if (b == DataConstants.NULL) {
@@ -143,13 +267,13 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl
return readSimpleString(buffer);
}
-
public static SimpleString readSimpleString(ByteBuf buffer) {
int len = buffer.readInt();
- if (len > buffer.readableBytes()) {
- throw new IndexOutOfBoundsException();
- }
- byte[] data = new byte[len];
+ return readSimpleString(buffer, len);
+ }
+
+ public static SimpleString readSimpleString(final ByteBuf buffer, final int length) {
+ byte[] data = new byte[length];
buffer.readBytes(data);
return new SimpleString(data);
}
@@ -169,8 +293,6 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl
buffer.writeBytes(data);
}
-
-
public SimpleString subSeq(final int start, final int end) {
int len = data.length >> 1;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8d776edd/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractInterner.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractInterner.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractInterner.java
new file mode 100644
index 0000000..7e1fe40
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractInterner.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.internal.MathUtil;
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * Thread-safe {@code <T>} interner.
+ * <p>
+ * Differently from {@link String#intern()} it contains a fixed amount of entries and
+ * when used by concurrent threads it doesn't ensure the uniqueness of the entries ie
+ * the same entry could be allocated multiple times by concurrent calls.
+ */
+public abstract class AbstractInterner<T> {
+
+ private final T[] entries;
+ private final int mask;
+ private final int shift;
+
+ public AbstractInterner(final int capacity) {
+ entries = (T[]) new Object[MathUtil.findNextPositivePowerOfTwo(capacity)];
+ mask = entries.length - 1;
+ //log2 of entries.length
+ shift = 31 - Integer.numberOfLeadingZeros(entries.length);
+ }
+
+ /**
+ * Batch hash code implementation that works at its best if {@code bytes}
+ * contains a {@link org.apache.activemq.artemis.api.core.SimpleString} encoded.
+ */
+ private static int hashCode(final ByteBuf bytes, final int offset, final int length) {
+ if (PlatformDependent.isUnaligned() && PlatformDependent.hasUnsafe()) {
+ //if the platform allows it, the hash code could be computed without bounds checking
+ if (bytes.hasArray()) {
+ return onHeapHashCode(bytes.array(), bytes.arrayOffset() + offset, length);
+ } else if (bytes.hasMemoryAddress()) {
+ return offHeapHashCode(bytes.memoryAddress(), offset, length);
+ }
+ }
+ return byteBufHashCode(bytes, offset, length);
+ }
+
+ private static int onHeapHashCode(final byte[] bytes, final int offset, final int length) {
+ final int intCount = length >>> 1;
+ final int byteCount = length & 1;
+ int hashCode = 1;
+ int arrayIndex = offset;
+ for (int i = 0; i < intCount; i++) {
+ hashCode = 31 * hashCode + PlatformDependent.getShort(bytes, arrayIndex);
+ arrayIndex += 2;
+ }
+ for (int i = 0; i < byteCount; i++) {
+ hashCode = 31 * hashCode + PlatformDependent.getByte(bytes, arrayIndex++);
+ }
+ return hashCode;
+ }
+
+ private static int offHeapHashCode(final long address, final int offset, final int length) {
+ final int intCount = length >>> 1;
+ final int byteCount = length & 1;
+ int hashCode = 1;
+ int arrayIndex = offset;
+ for (int i = 0; i < intCount; i++) {
+ hashCode = 31 * hashCode + PlatformDependent.getShort(address + arrayIndex);
+ arrayIndex += 2;
+ }
+ for (int i = 0; i < byteCount; i++) {
+ hashCode = 31 * hashCode + PlatformDependent.getByte(address + arrayIndex++);
+ }
+ return hashCode;
+ }
+
+ private static int byteBufHashCode(final ByteBuf byteBuf, final int offset, final int length) {
+ final int intCount = length >>> 1;
+ final int byteCount = length & 1;
+ int hashCode = 1;
+ int arrayIndex = offset;
+ for (int i = 0; i < intCount; i++) {
+ final short shortLE = byteBuf.getShortLE(arrayIndex);
+ final short nativeShort = PlatformDependent.BIG_ENDIAN_NATIVE_ORDER ? Short.reverseBytes(shortLE) : shortLE;
+ hashCode = 31 * hashCode + nativeShort;
+ arrayIndex += 2;
+ }
+ for (int i = 0; i < byteCount; i++) {
+ hashCode = 31 * hashCode + byteBuf.getByte(arrayIndex++);
+ }
+ return hashCode;
+ }
+
+ /**
+ * Returns {@code true} if {@code length}'s {@code byteBuf} content from {@link ByteBuf#readerIndex()} can be interned,
+ * {@code false} otherwise.
+ */
+ protected abstract boolean canIntern(ByteBuf byteBuf, int length);
+
+ /**
+ * Create a new entry.
+ */
+ protected abstract T create(ByteBuf byteBuf, int length);
+
+ /**
+ * Returns {@code true} if the {@code entry} content is the same of {@code byteBuf} at the specified {@code offset}
+ * and {@code length} {@code false} otherwise.
+ */
+ protected abstract boolean isEqual(T entry, ByteBuf byteBuf, int offset, int length);
+
+ /**
+ * Returns and interned entry if possible, a new one otherwise.
+ * <p>
+ * The {@code byteBuf}'s {@link ByteBuf#readerIndex()} is incremented by {@code length} after it.
+ */
+ public final T intern(final ByteBuf byteBuf, final int length) {
+ if (!canIntern(byteBuf, length)) {
+ return create(byteBuf, length);
+ } else {
+ if (!byteBuf.isReadable(length)) {
+ throw new IndexOutOfBoundsException();
+ }
+ final int bytesOffset = byteBuf.readerIndex();
+ final int hashCode = hashCode(byteBuf, bytesOffset, length);
+ //fast % operation with power of 2 entries.length
+ final int firstIndex = hashCode & mask;
+ final T firstEntry = entries[firstIndex];
+ if (isEqual(firstEntry, byteBuf, bytesOffset, length)) {
+ byteBuf.skipBytes(length);
+ return firstEntry;
+ }
+ final int secondIndex = (hashCode >> shift) & mask;
+ final T secondEntry = entries[secondIndex];
+ if (isEqual(secondEntry, byteBuf, bytesOffset, length)) {
+ byteBuf.skipBytes(length);
+ return secondEntry;
+ }
+ final T internedEntry = create(byteBuf, length);
+ final int entryIndex = firstEntry == null ? firstIndex : secondIndex;
+ entries[entryIndex] = internedEntry;
+ return internedEntry;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8d776edd/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
index b17156e..a3e4876 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
@@ -28,6 +28,7 @@ import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.logs.ActiveMQUtilBundle;
+import org.apache.activemq.artemis.utils.AbstractInterner;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.DataConstants;
@@ -94,6 +95,7 @@ public class TypedProperties {
public void putByteProperty(final SimpleString key, final byte value) {
checkCreateProperties();
+ checkCreateProperties();
doPutValue(key, ByteValue.valueOf(value));
}
@@ -329,7 +331,9 @@ public class TypedProperties {
}
}
- public synchronized void decode(final ByteBuf buffer) {
+ public synchronized void decode(final ByteBuf buffer,
+ final SimpleString.Interner keyInterner,
+ final StringValue.Interner valueInterner) {
byte b = buffer.readByte();
if (b == DataConstants.NULL) {
@@ -342,10 +346,15 @@ public class TypedProperties {
size = 0;
for (int i = 0; i < numHeaders; i++) {
+ final SimpleString key;
int len = buffer.readInt();
- byte[] data = new byte[len];
- buffer.readBytes(data);
- SimpleString key = new SimpleString(data);
+ if (keyInterner != null) {
+ key = keyInterner.intern(buffer, len);
+ } else {
+ byte[] data = new byte[len];
+ buffer.readBytes(data);
+ key = new SimpleString(data);
+ }
byte type = buffer.readByte();
@@ -403,7 +412,12 @@ public class TypedProperties {
break;
}
case STRING: {
- val = new StringValue(buffer);
+ if (valueInterner != null) {
+ final int length = buffer.readInt();
+ val = valueInterner.intern(buffer, length);
+ } else {
+ val = new StringValue(buffer);
+ }
doPutValue(key, val);
break;
}
@@ -415,6 +429,10 @@ public class TypedProperties {
}
}
+ public synchronized void decode(final ByteBuf buffer) {
+ decode(buffer, null, null);
+ }
+
public synchronized void encode(final ByteBuf buffer) {
if (properties == null) {
buffer.writeByte(DataConstants.NULL);
@@ -881,7 +899,37 @@ public class TypedProperties {
}
}
- private static final class StringValue extends PropertyValue {
+ public static final class StringValue extends PropertyValue {
+
+ public static final class Interner extends AbstractInterner<StringValue> {
+
+ private final int maxLength;
+
+ public Interner(final int capacity, final int maxCharsLength) {
+ super(capacity);
+ this.maxLength = maxCharsLength;
+ }
+
+ @Override
+ protected boolean isEqual(final StringValue entry, final ByteBuf byteBuf, final int offset, final int length) {
+ if (entry == null) {
+ return false;
+ }
+ return SimpleString.isEqual(entry.val, byteBuf, offset, length);
+ }
+
+ @Override
+ protected boolean canIntern(final ByteBuf byteBuf, final int length) {
+ assert length % 2 == 0 : "length must be a multiple of 2";
+ final int expectedStringLength = length >> 1;
+ return expectedStringLength <= maxLength;
+ }
+
+ @Override
+ protected StringValue create(final ByteBuf byteBuf, final int length) {
+ return new StringValue(SimpleString.readSimpleString(byteBuf, length));
+ }
+ }
final SimpleString val;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8d776edd/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index b0656b6..4ebf97e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.Set;
+import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -42,8 +43,6 @@ import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.jboss.logging.Logger;
-import io.netty.buffer.ByteBuf;
-
/** Note: you shouldn't change properties using multi-threads. Change your properties before you can send it to multiple
* consumers */
public class CoreMessage extends RefCountMessage implements ICoreMessage {
@@ -94,7 +93,18 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
protected volatile TypedProperties properties;
+ private final SimpleString.Interner keysInterner;
+ private final TypedProperties.StringValue.Interner valuesInterner;
+
+ public CoreMessage(final SimpleString.Interner keysInterner,
+ final TypedProperties.StringValue.Interner valuesInterner) {
+ this.keysInterner = keysInterner;
+ this.valuesInterner = valuesInterner;
+ }
+
public CoreMessage() {
+ this.keysInterner = null;
+ this.valuesInterner = null;
}
/** On core there's no delivery annotation */
@@ -318,6 +328,8 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
public CoreMessage(long id, int bufferSize) {
this.initBuffer(bufferSize);
this.setMessageID(id);
+ this.keysInterner = null;
+ this.valuesInterner = null;
}
protected CoreMessage(CoreMessage other, TypedProperties copyProperties) {
@@ -331,6 +343,8 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
this.timestamp = other.timestamp;
this.priority = other.priority;
this.userID = other.userID;
+ this.keysInterner = other.keysInterner;
+ this.valuesInterner = other.valuesInterner;
if (copyProperties != null) {
this.properties = new TypedProperties(copyProperties);
}
@@ -464,7 +478,8 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
if (properties == null) {
TypedProperties properties = new TypedProperties();
if (buffer != null && propertiesLocation >= 0) {
- properties.decode(buffer.duplicate().readerIndex(propertiesLocation));
+ final ByteBuf byteBuf = buffer.duplicate().readerIndex(propertiesLocation);
+ properties.decode(byteBuf, keysInterner, valuesInterner);
}
this.properties = properties;
}
@@ -528,8 +543,17 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
private void decodeHeadersAndProperties(final ByteBuf buffer, boolean lazyProperties) {
messageIDPosition = buffer.readerIndex();
messageID = buffer.readLong();
-
- address = SimpleString.readNullableSimpleString(buffer);
+ int b = buffer.readByte();
+ if (b != DataConstants.NULL) {
+ final int length = buffer.readInt();
+ if (keysInterner != null) {
+ address = keysInterner.intern(buffer, length);
+ } else {
+ address = SimpleString.readSimpleString(buffer, length);
+ }
+ } else {
+ address = null;
+ }
if (buffer.readByte() == DataConstants.NOT_NULL) {
byte[] bytes = new byte[16];
buffer.readBytes(bytes);
@@ -547,7 +571,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
propertiesLocation = buffer.readerIndex();
} else {
properties = new TypedProperties();
- properties.decode(buffer);
+ properties.decode(buffer, keysInterner, valuesInterner);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8d776edd/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java
index 1022030..787e499 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java
@@ -34,6 +34,10 @@ public class ClientPacketDecoder extends PacketDecoder {
private static final long serialVersionUID = 6952614096979334582L;
public static final ClientPacketDecoder INSTANCE = new ClientPacketDecoder();
+ protected ClientPacketDecoder() {
+
+ }
+
@Override
public Packet decode(final ActiveMQBuffer in, CoreRemotingConnection connection) {
final byte packetType = in.readByte();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8d776edd/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
index 93432b8..f0005ff 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
@@ -409,7 +409,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager {
List<Interceptor> incomingInterceptors,
List<Interceptor> outgoingInterceptors,
TopologyResponseHandler topologyResponseHandler) {
- this.connection = new RemotingConnectionImpl(getPacketDecoder(), transportConnection, callTimeout, callFailoverTimeout, incomingInterceptors, outgoingInterceptors);
+ this.connection = new RemotingConnectionImpl(createPacketDecoder(), transportConnection, callTimeout, callFailoverTimeout, incomingInterceptors, outgoingInterceptors);
this.topologyResponseHandler = topologyResponseHandler;
@@ -510,7 +510,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager {
}
}
- protected PacketDecoder getPacketDecoder() {
+ protected PacketDecoder createPacketDecoder() {
return ClientPacketDecoder.INSTANCE;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8d776edd/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
index 0584476..2276fdb 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
@@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.protocol;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.Packet;
@@ -53,6 +54,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReq
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X;
+import org.apache.activemq.artemis.utils.collections.TypedProperties;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST_RESPONSE;
@@ -83,16 +85,34 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SES
public class ServerPacketDecoder extends ClientPacketDecoder {
+ private static final int UUID_LENGTH = 36;
+ private static final int DEFAULT_INTERNER_CAPACITY = 32;
private static final long serialVersionUID = 3348673114388400766L;
- public static final ServerPacketDecoder INSTANCE = new ServerPacketDecoder();
+ private SimpleString.Interner keysInterner;
+ private TypedProperties.StringValue.Interner valuesInterner;
- private static SessionSendMessage decodeSessionSendMessage(final ActiveMQBuffer in, CoreRemotingConnection connection) {
+ public ServerPacketDecoder() {
+ this.keysInterner = null;
+ this.valuesInterner = null;
+ }
+
+ private void initializeInternersIfNeeded() {
+ if (this.keysInterner == null) {
+ this.keysInterner = new SimpleString.Interner(DEFAULT_INTERNER_CAPACITY, UUID_LENGTH);
+ }
+ if (this.valuesInterner == null) {
+ this.valuesInterner = new TypedProperties.StringValue.Interner(DEFAULT_INTERNER_CAPACITY, UUID_LENGTH);
+ }
+ }
+
+ private SessionSendMessage decodeSessionSendMessage(final ActiveMQBuffer in, CoreRemotingConnection connection) {
final SessionSendMessage sendMessage;
+ initializeInternersIfNeeded();
if (connection.isVersionBeforeAddressChange()) {
- sendMessage = new SessionSendMessage_1X(new CoreMessage());
+ sendMessage = new SessionSendMessage_1X(new CoreMessage(this.keysInterner, this.valuesInterner));
} else {
- sendMessage = new SessionSendMessage(new CoreMessage());
+ sendMessage = new SessionSendMessage(new CoreMessage(this.keysInterner, this.valuesInterner));
}
sendMessage.decode(in);
@@ -259,5 +279,4 @@ public class ServerPacketDecoder extends ClientPacketDecoder {
return packet;
}
-
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8d776edd/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
index c9262fa..af9e131 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
@@ -116,7 +116,7 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor> {
Executor connectionExecutor = server.getExecutorFactory().getExecutor();
- final CoreRemotingConnection rc = new RemotingConnectionImpl(ServerPacketDecoder.INSTANCE, connection, incomingInterceptors, outgoingInterceptors, config.isAsyncConnectionExecutionEnabled() ? connectionExecutor : null, server.getNodeID());
+ final CoreRemotingConnection rc = new RemotingConnectionImpl(new ServerPacketDecoder(), connection, incomingInterceptors, outgoingInterceptors, config.isAsyncConnectionExecutionEnabled() ? connectionExecutor : null, server.getNodeID());
Channel channel1 = rc.getChannel(CHANNEL_ID.SESSION.id, -1);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8d776edd/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ActiveMQServerSideProtocolManagerFactory.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ActiveMQServerSideProtocolManagerFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ActiveMQServerSideProtocolManagerFactory.java
index 85ad3a3..209f68f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ActiveMQServerSideProtocolManagerFactory.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ActiveMQServerSideProtocolManagerFactory.java
@@ -65,8 +65,8 @@ public class ActiveMQServerSideProtocolManagerFactory implements ClientProtocolM
class ActiveMQReplicationProtocolManager extends ActiveMQClientProtocolManager {
@Override
- protected PacketDecoder getPacketDecoder() {
- return ServerPacketDecoder.INSTANCE;
+ protected PacketDecoder createPacketDecoder() {
+ return new ServerPacketDecoder();
}
}
}
[2/6] activemq-artemis git commit: ARTEMIS-1586 Refactor to make more
generic
Posted by mi...@apache.org.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 75faa97..57865b7 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -154,7 +154,7 @@ public class AMQConsumer {
}
addressInfo.setInternal(internalAddress);
if (isDurable) {
- queueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForSubscription(true, clientID, subscriptionName));
+ queueName = org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForSubscription(true, clientID, subscriptionName);
QueueQueryResult result = session.getCoreSession().executeQueueQuery(queueName);
if (result.isExists()) {
// Already exists
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 64d1353..bca7eae 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -27,6 +27,7 @@ import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
@@ -82,6 +83,8 @@ public class AMQSession implements SessionCallback {
private final OpenWireProtocolManager protocolManager;
+ private final CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools();
+
public AMQSession(ConnectionInfo connInfo,
SessionInfo sessInfo,
ActiveMQServer server,
@@ -295,7 +298,7 @@ public class AMQSession implements SessionCallback {
}
@Override
- public void disconnect(ServerConsumer consumerId, String queueName) {
+ public void disconnect(ServerConsumer consumerId, SimpleString queueName) {
// TODO Auto-generated method stub
}
@@ -315,7 +318,7 @@ public class AMQSession implements SessionCallback {
actualDestinations = new ActiveMQDestination[]{destination};
}
- org.apache.activemq.artemis.api.core.Message originalCoreMsg = getConverter().inbound(messageSend);
+ org.apache.activemq.artemis.api.core.Message originalCoreMsg = getConverter().inbound(messageSend, coreMessageObjectPools);
originalCoreMsg.putStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString(), this.connection.getState().getInfo().getClientId());
@@ -338,12 +341,12 @@ public class AMQSession implements SessionCallback {
for (int i = 0; i < actualDestinations.length; i++) {
ActiveMQDestination dest = actualDestinations[i];
- SimpleString address = new SimpleString(dest.getPhysicalName());
+ SimpleString address = SimpleString.toSimpleString(dest.getPhysicalName(), coreMessageObjectPools.getAddressStringSimpleStringPool());
org.apache.activemq.artemis.api.core.Message coreMsg = originalCoreMsg.copy();
coreMsg.setAddress(address);
if (actualDestinations[i].isQueue()) {
- checkAutoCreateQueue(new SimpleString(actualDestinations[i].getPhysicalName()), actualDestinations[i].isTemporary());
+ checkAutoCreateQueue(SimpleString.toSimpleString(actualDestinations[i].getPhysicalName(), coreMessageObjectPools.getAddressStringSimpleStringPool()), actualDestinations[i].isTemporary());
coreMsg.setRoutingType(RoutingType.ANYCAST);
} else {
coreMsg.setRoutingType(RoutingType.MULTICAST);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index 18e0b10..0a12b47 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -239,7 +239,7 @@ public class StompSession implements SessionCallback {
}
@Override
- public void disconnect(ServerConsumer consumerId, String queueName) {
+ public void disconnect(ServerConsumer consumerId, SimpleString queueName) {
StompSubscription stompSubscription = subscriptions.remove(consumerId.getID());
if (stompSubscription != null) {
StompFrame frame = connection.getFrameHandler().createStompFrame(Stomp.Responses.ERROR);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java
----------------------------------------------------------------------
diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java
index 9133cdf..f343ec9 100644
--- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java
+++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java
@@ -111,7 +111,7 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList
// Create the message consumer
SimpleString selectorString = selector == null || selector.trim().equals("") ? null : new SimpleString(selector);
if (activation.isTopic() && spec.isSubscriptionDurable()) {
- SimpleString queueName = new SimpleString(ActiveMQDestination.createQueueNameForSubscription(true, spec.getClientID(), spec.getSubscriptionName()));
+ SimpleString queueName = ActiveMQDestination.createQueueNameForSubscription(true, spec.getClientID(), spec.getSubscriptionName());
QueueQuery subResponse = session.queueQuery(queueName);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
index 2276fdb..d38f45f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
@@ -17,7 +17,6 @@
package org.apache.activemq.artemis.core.protocol;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.Packet;
@@ -54,7 +53,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReq
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X;
-import org.apache.activemq.artemis.utils.collections.TypedProperties;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST_RESPONSE;
@@ -85,34 +83,15 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SES
public class ServerPacketDecoder extends ClientPacketDecoder {
- private static final int UUID_LENGTH = 36;
- private static final int DEFAULT_INTERNER_CAPACITY = 32;
private static final long serialVersionUID = 3348673114388400766L;
- private SimpleString.Interner keysInterner;
- private TypedProperties.StringValue.Interner valuesInterner;
-
- public ServerPacketDecoder() {
- this.keysInterner = null;
- this.valuesInterner = null;
- }
-
- private void initializeInternersIfNeeded() {
- if (this.keysInterner == null) {
- this.keysInterner = new SimpleString.Interner(DEFAULT_INTERNER_CAPACITY, UUID_LENGTH);
- }
- if (this.valuesInterner == null) {
- this.valuesInterner = new TypedProperties.StringValue.Interner(DEFAULT_INTERNER_CAPACITY, UUID_LENGTH);
- }
- }
private SessionSendMessage decodeSessionSendMessage(final ActiveMQBuffer in, CoreRemotingConnection connection) {
final SessionSendMessage sendMessage;
- initializeInternersIfNeeded();
if (connection.isVersionBeforeAddressChange()) {
- sendMessage = new SessionSendMessage_1X(new CoreMessage(this.keysInterner, this.valuesInterner));
+ sendMessage = new SessionSendMessage_1X(new CoreMessage(this.coreMessageObjectPools));
} else {
- sendMessage = new SessionSendMessage(new CoreMessage(this.keysInterner, this.valuesInterner));
+ sendMessage = new SessionSendMessage(new CoreMessage(this.coreMessageObjectPools));
}
sendMessage.decode(in);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
index 8b281eb..f53d028 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.protocol.core.Channel;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler;
@@ -48,6 +49,8 @@ public final class CoreSessionCallback implements SessionCallback {
private ServerSessionPacketHandler handler;
+ private CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools();
+
public CoreSessionCallback(String name,
ProtocolManager protocolManager,
Channel channel,
@@ -115,9 +118,9 @@ public final class CoreSessionCallback implements SessionCallback {
Packet packet;
if (channel.getConnection().isVersionBeforeAddressChange()) {
- packet = new SessionReceiveMessage_1X(consumer.getID(), message.toCore(), deliveryCount);
+ packet = new SessionReceiveMessage_1X(consumer.getID(), message.toCore(coreMessageObjectPools), deliveryCount);
} else {
- packet = new SessionReceiveMessage(consumer.getID(), message.toCore(), deliveryCount);
+ packet = new SessionReceiveMessage(consumer.getID(), message.toCore(coreMessageObjectPools), deliveryCount);
}
int size = 0;
@@ -159,11 +162,11 @@ public final class CoreSessionCallback implements SessionCallback {
}
@Override
- public void disconnect(ServerConsumer consumerId, String queueName) {
+ public void disconnect(ServerConsumer consumerId, SimpleString queueName) {
if (channel.supports(PacketImpl.DISCONNECT_CONSUMER)) {
channel.send(new DisconnectConsumerMessage(consumerId.getID()));
} else {
- ActiveMQServerLogger.LOGGER.warnDisconnectOldClient(queueName);
+ ActiveMQServerLogger.LOGGER.warnDisconnectOldClient(queueName.toString());
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 5dc1b93..15b1465 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -1045,7 +1045,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
@Override
public void disconnect() {
- callback.disconnect(this, getQueue().getName().toString());
+ callback.disconnect(this, getQueue().getName());
}
public float getRate() {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java
index a440e31..2c81343 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java
@@ -18,10 +18,11 @@ package org.apache.activemq.artemis.spi.core.protocol;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
public interface MessageConverter<ProtocolMessage extends Message> {
- ICoreMessage toCore(ProtocolMessage pureMessage) throws Exception;
+ ICoreMessage toCore(ProtocolMessage pureMessage, CoreMessageObjectPools coreMessageObjectPools) throws Exception;
ProtocolMessage fromCore(ICoreMessage coreMessage) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
index ae1612f..c4a2dbe 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
@@ -81,7 +81,7 @@ public interface SessionCallback {
void closed();
- void disconnect(ServerConsumer consumerId, String queueName);
+ void disconnect(ServerConsumer consumerId, SimpleString queueName);
boolean isWritable(ReadyListener callback, Object protocolContext);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 2707190..5cfac12 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -39,6 +39,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.Persister;
@@ -334,6 +335,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
@Override
public CoreMessage toCore() {
+ return toCore(null);
+ }
+
+ @Override
+ public CoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) {
return null;
}
@@ -591,6 +597,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
+ public Message putStringProperty(SimpleString key, String value) {
+ return null;
+ }
+
+ @Override
public Message putStringProperty(String key, String value) {
return null;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
index d7c9855..078c397 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
@@ -37,6 +37,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQConsumerContext;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -386,6 +387,11 @@ public class AcknowledgeTest extends ActiveMQTestBase {
@Override
public ICoreMessage toCore() {
+ return toCore(null);
+ }
+
+ @Override
+ public ICoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) {
return null;
}
@@ -648,6 +654,11 @@ public class AcknowledgeTest extends ActiveMQTestBase {
}
@Override
+ public Message putStringProperty(SimpleString key, String value) {
+ return null;
+ }
+
+ @Override
public Message putStringProperty(String key, String value) {
return null;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
index 2f25480..dc57a12 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
@@ -585,7 +585,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
}
@Override
- public void disconnect(ServerConsumer consumerId, String queueName) {
+ public void disconnect(ServerConsumer consumerId, SimpleString queueName) {
//To change body of implemented methods use File | Settings | File Templates.
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java
index 790ed82..aaf29b0 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java
@@ -128,7 +128,7 @@ public class XmlImportExportTest extends ActiveMQTestBase {
msg.putStringProperty("myNonAsciiStringProperty", international.toString());
msg.putStringProperty("mySpecialCharacters", special);
msg.putStringProperty(new SimpleString("mySimpleStringProperty"), new SimpleString("mySimpleStringPropertyValue_" + i));
- msg.putStringProperty(new SimpleString("myNullSimpleStringProperty"), null);
+ msg.putStringProperty(new SimpleString("myNullSimpleStringProperty"), (SimpleString) null);
producer.send(msg);
}
[3/6] activemq-artemis git commit: ARTEMIS-1586 Refactor to make more
generic
Posted by mi...@apache.org.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
index 896a8ed..6e28c0e 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
@@ -372,7 +372,7 @@ public class ActiveMQMessage implements javax.jms.Message {
public void setJMSReplyTo(final Destination dest) throws JMSException {
if (dest == null) {
- MessageUtil.setJMSReplyTo(message, null);
+ MessageUtil.setJMSReplyTo(message, (String) null);
replyTo = null;
} else {
if (dest instanceof ActiveMQDestination == false) {
@@ -391,7 +391,7 @@ public class ActiveMQMessage implements javax.jms.Message {
}
ActiveMQDestination jbd = (ActiveMQDestination) dest;
- MessageUtil.setJMSReplyTo(message, SimpleString.toSimpleString(prefix + jbd.getAddress()));
+ MessageUtil.setJMSReplyTo(message, prefix + jbd.getAddress());
replyTo = jbd;
}
@@ -401,14 +401,15 @@ public class ActiveMQMessage implements javax.jms.Message {
public Destination getJMSDestination() throws JMSException {
if (dest == null) {
SimpleString address = message.getAddressSimpleString();
- String prefix = "";
- if (RoutingType.ANYCAST.equals(message.getRoutingType())) {
- prefix = QUEUE_QUALIFIED_PREFIX;
+ if (address == null) {
+ dest = null;
+ } else if (RoutingType.ANYCAST.equals(message.getRoutingType())) {
+ dest = ActiveMQDestination.createQueue(address);
} else if (RoutingType.MULTICAST.equals(message.getRoutingType())) {
- prefix = TOPIC_QUALIFIED_PREFIX;
+ dest = ActiveMQDestination.createTopic(address);
+ } else {
+ dest = ActiveMQDestination.fromPrefixedName(address.toString());
}
-
- dest = address == null ? null : ActiveMQDestination.fromPrefixedName(prefix + address.toString());
}
return dest;
@@ -513,7 +514,7 @@ public class ActiveMQMessage implements javax.jms.Message {
@Override
public boolean getBooleanProperty(final String name) throws JMSException {
try {
- return message.getBooleanProperty(new SimpleString(name));
+ return message.getBooleanProperty(name);
} catch (ActiveMQPropertyConversionException e) {
throw new MessageFormatException(e.getMessage());
}
@@ -522,7 +523,7 @@ public class ActiveMQMessage implements javax.jms.Message {
@Override
public byte getByteProperty(final String name) throws JMSException {
try {
- return message.getByteProperty(new SimpleString(name));
+ return message.getByteProperty(name);
} catch (ActiveMQPropertyConversionException e) {
throw new MessageFormatException(e.getMessage());
}
@@ -531,7 +532,7 @@ public class ActiveMQMessage implements javax.jms.Message {
@Override
public short getShortProperty(final String name) throws JMSException {
try {
- return message.getShortProperty(new SimpleString(name));
+ return message.getShortProperty(name);
} catch (ActiveMQPropertyConversionException e) {
throw new MessageFormatException(e.getMessage());
}
@@ -544,7 +545,7 @@ public class ActiveMQMessage implements javax.jms.Message {
}
try {
- return message.getIntProperty(new SimpleString(name));
+ return message.getIntProperty(name);
} catch (ActiveMQPropertyConversionException e) {
throw new MessageFormatException(e.getMessage());
}
@@ -557,7 +558,7 @@ public class ActiveMQMessage implements javax.jms.Message {
}
try {
- return message.getLongProperty(new SimpleString(name));
+ return message.getLongProperty(name);
} catch (ActiveMQPropertyConversionException e) {
throw new MessageFormatException(e.getMessage());
}
@@ -566,7 +567,7 @@ public class ActiveMQMessage implements javax.jms.Message {
@Override
public float getFloatProperty(final String name) throws JMSException {
try {
- return message.getFloatProperty(new SimpleString(name));
+ return message.getFloatProperty(name);
} catch (ActiveMQPropertyConversionException e) {
throw new MessageFormatException(e.getMessage());
}
@@ -575,7 +576,7 @@ public class ActiveMQMessage implements javax.jms.Message {
@Override
public double getDoubleProperty(final String name) throws JMSException {
try {
- return message.getDoubleProperty(new SimpleString(name));
+ return message.getDoubleProperty(name);
} catch (ActiveMQPropertyConversionException e) {
throw new MessageFormatException(e.getMessage());
}
@@ -593,7 +594,7 @@ public class ActiveMQMessage implements javax.jms.Message {
} else if (MessageUtil.JMSXUSERID.equals(name)) {
return message.getValidatedUserID();
} else {
- return message.getStringProperty(new SimpleString(name));
+ return message.getStringProperty(name);
}
} catch (ActiveMQPropertyConversionException e) {
throw new MessageFormatException(e.getMessage());
@@ -608,7 +609,7 @@ public class ActiveMQMessage implements javax.jms.Message {
Object val = message.getObjectProperty(name);
if (val instanceof SimpleString) {
- val = ((SimpleString) val).toString();
+ val = val.toString();
}
return val;
}
@@ -622,43 +623,43 @@ public class ActiveMQMessage implements javax.jms.Message {
public void setBooleanProperty(final String name, final boolean value) throws JMSException {
checkProperty(name);
- message.putBooleanProperty(new SimpleString(name), value);
+ message.putBooleanProperty(name, value);
}
@Override
public void setByteProperty(final String name, final byte value) throws JMSException {
checkProperty(name);
- message.putByteProperty(new SimpleString(name), value);
+ message.putByteProperty(name, value);
}
@Override
public void setShortProperty(final String name, final short value) throws JMSException {
checkProperty(name);
- message.putShortProperty(new SimpleString(name), value);
+ message.putShortProperty(name, value);
}
@Override
public void setIntProperty(final String name, final int value) throws JMSException {
checkProperty(name);
- message.putIntProperty(new SimpleString(name), value);
+ message.putIntProperty(name, value);
}
@Override
public void setLongProperty(final String name, final long value) throws JMSException {
checkProperty(name);
- message.putLongProperty(new SimpleString(name), value);
+ message.putLongProperty(name, value);
}
@Override
public void setFloatProperty(final String name, final float value) throws JMSException {
checkProperty(name);
- message.putFloatProperty(new SimpleString(name), value);
+ message.putFloatProperty(name, value);
}
@Override
public void setDoubleProperty(final String name, final double value) throws JMSException {
checkProperty(name);
- message.putDoubleProperty(new SimpleString(name), value);
+ message.putDoubleProperty(name, value);
}
@Override
@@ -670,7 +671,7 @@ public class ActiveMQMessage implements javax.jms.Message {
} else if (handleCoreProperty(name, value, MessageUtil.JMSXUSERID, org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER)) {
return;
} else {
- message.putStringProperty(new SimpleString(name), SimpleString.toSimpleString(value));
+ message.putStringProperty(name, value);
}
}
@@ -703,7 +704,7 @@ public class ActiveMQMessage implements javax.jms.Message {
}
try {
- message.putObjectProperty(new SimpleString(name), value);
+ message.putObjectProperty(name, value);
} catch (ActiveMQPropertyConversionException e) {
throw new MessageFormatException(e.getMessage());
}
@@ -964,7 +965,7 @@ public class ActiveMQMessage implements javax.jms.Message {
boolean result = false;
if (jmsPropertyName.equals(name)) {
- message.putStringProperty(corePropertyName, SimpleString.toSimpleString(value.toString()));
+ message.putStringProperty(corePropertyName, value.toString());
result = true;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java
index 2deefa9..ff4ee0f 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java
@@ -18,6 +18,8 @@ package org.apache.activemq.artemis.jms.client;
import javax.jms.Queue;
+import org.apache.activemq.artemis.api.core.SimpleString;
+
/**
* ActiveMQ Artemis implementation of a JMS Queue.
* <br>
@@ -34,13 +36,17 @@ public class ActiveMQQueue extends ActiveMQDestination implements Queue {
// Constructors --------------------------------------------------
public ActiveMQQueue() {
- this(null);
+ this((SimpleString) null);
}
public ActiveMQQueue(final String address) {
super(address, TYPE.QUEUE, null);
}
+ public ActiveMQQueue(final SimpleString address) {
+ super(address, TYPE.QUEUE, null);
+ }
+
public ActiveMQQueue(final String address, boolean temporary) {
super(address, temporary ? TYPE.TEMP_QUEUE : TYPE.QUEUE, null);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
index 374a985..cf2ec59 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
@@ -627,7 +627,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
throw new InvalidDestinationException("Cannot create a durable subscription on a temporary topic");
}
- queueName = new SimpleString(ActiveMQDestination.createQueueNameForSubscription(durability == ConsumerDurability.DURABLE, connection.getClientID(), subscriptionName));
+ queueName = ActiveMQDestination.createQueueNameForSubscription(durability == ConsumerDurability.DURABLE, connection.getClientID(), subscriptionName);
if (durability == ConsumerDurability.DURABLE) {
try {
@@ -750,7 +750,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
throw new InvalidDestinationException("Cannot create a durable subscription on a temporary topic");
}
- queueName = new SimpleString(ActiveMQDestination.createQueueNameForSubscription(true, connection.getClientID(), subscriptionName));
+ queueName = ActiveMQDestination.createQueueNameForSubscription(true, connection.getClientID(), subscriptionName);
QueueQuery subResponse = session.queueQuery(queueName);
@@ -918,7 +918,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
throw new IllegalStateException("Cannot unsubscribe using a QueueSession");
}
- SimpleString queueName = new SimpleString(ActiveMQDestination.createQueueNameForSubscription(true, connection.getClientID(), name));
+ SimpleString queueName = ActiveMQDestination.createQueueNameForSubscription(true, connection.getClientID(), name);
try {
QueueQuery response = session.queueQuery(queueName);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQStreamMessage.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQStreamMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQStreamMessage.java
index 2762a9c..1c70c5b 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQStreamMessage.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQStreamMessage.java
@@ -73,7 +73,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre
// For testing only
public ActiveMQStreamMessage() {
- message = new ClientMessageImpl((byte) 0, false, 0, 0, (byte) 4, 1500);
+ message = new ClientMessageImpl((byte) 0, false, 0, 0, (byte) 4, 1500, null);
}
// Public --------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTopic.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTopic.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTopic.java
index e22e67b..4dbefec 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTopic.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTopic.java
@@ -18,6 +18,8 @@ package org.apache.activemq.artemis.jms.client;
import javax.jms.Topic;
+import org.apache.activemq.artemis.api.core.SimpleString;
+
/**
* ActiveMQ Artemis implementation of a JMS Topic.
* <br>
@@ -33,13 +35,17 @@ public class ActiveMQTopic extends ActiveMQDestination implements Topic {
// Constructors --------------------------------------------------
public ActiveMQTopic() {
- this(null);
+ this((SimpleString) null);
}
public ActiveMQTopic(final String address) {
this(address, false);
}
+ public ActiveMQTopic(final SimpleString address) {
+ super(address, TYPE.TOPIC, null);
+ }
+
public ActiveMQTopic(final String address, boolean temporary) {
super(address, TYPE.TOPIC, null);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 2bdd88a..cdab412 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -33,6 +33,7 @@ import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.RefCountMessage;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter;
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageIdHelper;
@@ -70,7 +71,7 @@ public class AMQPMessage extends RefCountMessage {
boolean bufferValid;
Boolean durable;
long messageID;
- String address;
+ SimpleString address;
MessageImpl protonMessage;
private volatile int memoryEstimate = -1;
private long expiration = 0;
@@ -90,6 +91,7 @@ public class AMQPMessage extends RefCountMessage {
private ApplicationProperties applicationProperties;
private long scheduledTime = -1;
private String connectionID;
+ private final CoreMessageObjectPools coreMessageObjectPools;
Set<Object> rejectedConsumers;
@@ -98,9 +100,14 @@ public class AMQPMessage extends RefCountMessage {
private volatile TypedProperties extraProperties;
public AMQPMessage(long messageFormat, byte[] data) {
+ this(messageFormat, data, null);
+ }
+
+ public AMQPMessage(long messageFormat, byte[] data, CoreMessageObjectPools coreMessageObjectPools) {
this.data = Unpooled.wrappedBuffer(data);
this.messageFormat = messageFormat;
this.bufferValid = true;
+ this.coreMessageObjectPools = coreMessageObjectPools;
parseHeaders();
}
@@ -108,12 +115,14 @@ public class AMQPMessage extends RefCountMessage {
public AMQPMessage(long messageFormat) {
this.messageFormat = messageFormat;
this.bufferValid = false;
+ this.coreMessageObjectPools = null;
}
public AMQPMessage(long messageFormat, Message message) {
this.messageFormat = messageFormat;
this.protonMessage = (MessageImpl) message;
this.bufferValid = false;
+ this.coreMessageObjectPools = null;
}
public AMQPMessage(Message message) {
@@ -301,7 +310,7 @@ public class AMQPMessage extends RefCountMessage {
parseHeaders();
if (_properties != null && _properties.getGroupId() != null) {
- return SimpleString.toSimpleString(_properties.getGroupId());
+ return SimpleString.toSimpleString(_properties.getGroupId(), coreMessageObjectPools == null ? null : coreMessageObjectPools.getGroupIdStringSimpleStringPool());
} else {
return null;
}
@@ -588,36 +597,33 @@ public class AMQPMessage extends RefCountMessage {
@Override
public String getAddress() {
- if (address == null) {
- Properties properties = getProtonMessage().getProperties();
- if (properties != null) {
- return properties.getTo();
- } else {
- return null;
- }
- } else {
- return address;
- }
+ SimpleString addressSimpleString = getAddressSimpleString();
+ return addressSimpleString == null ? null : addressSimpleString.toString();
}
@Override
public AMQPMessage setAddress(String address) {
- this.address = address;
+ this.address = SimpleString.toSimpleString(address, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressStringSimpleStringPool());
return this;
}
@Override
public AMQPMessage setAddress(SimpleString address) {
- if (address != null) {
- return setAddress(address.toString());
- } else {
- return setAddress((String) null);
- }
+ this.address = address;
+ return this;
}
@Override
public SimpleString getAddressSimpleString() {
- return SimpleString.toSimpleString(getAddress());
+ if (address == null) {
+ Properties properties = getProtonMessage().getProperties();
+ if (properties != null) {
+ setAddress(properties.getTo());
+ } else {
+ return null;
+ }
+ }
+ return address;
}
@Override
@@ -977,7 +983,7 @@ public class AMQPMessage extends RefCountMessage {
if (applicationProperties != null) getProtonMessage().setApplicationProperties(applicationProperties);
if (_properties != null) {
if (address != null) {
- _properties.setTo(address);
+ _properties.setTo(address.toString());
}
getProtonMessage().setProperties(this._properties);
}
@@ -987,7 +993,7 @@ public class AMQPMessage extends RefCountMessage {
@Override
public SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException {
- return SimpleString.toSimpleString((String) getApplicationPropertiesMap().get(key));
+ return SimpleString.toSimpleString((String) getApplicationPropertiesMap().get(key), getPropertyValuesPool());
}
@Override
@@ -1066,10 +1072,15 @@ public class AMQPMessage extends RefCountMessage {
}
@Override
+ public org.apache.activemq.artemis.api.core.Message putStringProperty(SimpleString key, String value) {
+ return putStringProperty(key.toString(), value);
+ }
+
+ @Override
public Set<SimpleString> getPropertyNames() {
HashSet<SimpleString> values = new HashSet<>();
for (Object k : getApplicationPropertiesMap().keySet()) {
- values.add(SimpleString.toSimpleString(k.toString()));
+ values.add(SimpleString.toSimpleString(k.toString(), getPropertyKeysPool()));
}
return values;
}
@@ -1084,17 +1095,22 @@ public class AMQPMessage extends RefCountMessage {
}
@Override
- public ICoreMessage toCore() {
+ public ICoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) {
try {
- return AMQPConverter.getInstance().toCore(this);
+ return AMQPConverter.getInstance().toCore(this, coreMessageObjectPools);
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}
@Override
+ public ICoreMessage toCore() {
+ return toCore(null);
+ }
+
+ @Override
public SimpleString getLastValueProperty() {
- return getSimpleStringProperty(HDR_LAST_VALUE_NAME.toString());
+ return getSimpleStringProperty(HDR_LAST_VALUE_NAME);
}
@Override
@@ -1155,4 +1171,12 @@ public class AMQPMessage extends RefCountMessage {
", address=" + getAddress() +
"]";
}
+
+ private SimpleString.StringSimpleStringPool getPropertyKeysPool() {
+ return coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesStringSimpleStringPools().getPropertyKeysPool();
+ }
+
+ private SimpleString.StringSimpleStringPool getPropertyValuesPool() {
+ return coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesStringSimpleStringPools().getPropertyValuesPool();
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 19348f4..7134d3b 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.StorageManager;
@@ -101,6 +102,7 @@ public class AMQPSessionCallback implements SessionCallback {
private final AtomicBoolean draining = new AtomicBoolean(false);
+ private CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools();
private final AddressQueryCache<AddressQueryResult> addressQueryCache = new AddressQueryCache<>();
@@ -210,14 +212,14 @@ public class AMQPSessionCallback implements SessionCallback {
}
public Object createSender(ProtonServerSenderContext protonSender,
- String queue,
+ SimpleString queue,
String filter,
boolean browserOnly) throws Exception {
long consumerID = consumerIDGenerator.generateID();
filter = SelectorTranslator.convertToActiveMQFilterString(filter);
- ServerConsumer consumer = serverSession.createConsumer(consumerID, SimpleString.toSimpleString(queue), SimpleString.toSimpleString(filter), browserOnly, false, null);
+ ServerConsumer consumer = serverSession.createConsumer(consumerID, queue, SimpleString.toSimpleString(filter), browserOnly, false, null);
// AMQP handles its own flow control for when it's started
consumer.setStarted(true);
@@ -233,48 +235,48 @@ public class AMQPSessionCallback implements SessionCallback {
serverConsumer.receiveCredits(-1);
}
- public void createTemporaryQueue(String queueName, RoutingType routingType) throws Exception {
- serverSession.createQueue(SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(queueName), routingType, null, true, false);
+ public void createTemporaryQueue(SimpleString queueName, RoutingType routingType) throws Exception {
+ serverSession.createQueue(queueName, queueName, routingType, null, true, false);
}
- public void createTemporaryQueue(String address,
- String queueName,
+ public void createTemporaryQueue(SimpleString address,
+ SimpleString queueName,
RoutingType routingType,
- String filter) throws Exception {
- serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), true, false);
+ SimpleString filter) throws Exception {
+ serverSession.createQueue(address, queueName, routingType, filter, true, false);
}
- public void createUnsharedDurableQueue(String address,
+ public void createUnsharedDurableQueue(SimpleString address,
RoutingType routingType,
- String queueName,
- String filter) throws Exception {
- serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, true, 1, false, false);
+ SimpleString queueName,
+ SimpleString filter) throws Exception {
+ serverSession.createQueue(address, queueName, routingType, filter, false, true, 1, false, false);
}
- public void createSharedDurableQueue(String address,
+ public void createSharedDurableQueue(SimpleString address,
RoutingType routingType,
- String queueName,
- String filter) throws Exception {
- serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, true, -1, false, false);
+ SimpleString queueName,
+ SimpleString filter) throws Exception {
+ serverSession.createQueue(address, queueName, routingType, filter, false, true, -1, false, false);
}
- public void createSharedVolatileQueue(String address,
+ public void createSharedVolatileQueue(SimpleString address,
RoutingType routingType,
- String queueName,
- String filter) throws Exception {
- serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, false, -1, true, true);
+ SimpleString queueName,
+ SimpleString filter) throws Exception {
+ serverSession.createQueue(address, queueName, routingType, filter, false, false, -1, true, true);
}
- public QueueQueryResult queueQuery(String queueName, RoutingType routingType, boolean autoCreate) throws Exception {
- QueueQueryResult queueQueryResult = serverSession.executeQueueQuery(SimpleString.toSimpleString(queueName));
+ public QueueQueryResult queueQuery(SimpleString queueName, RoutingType routingType, boolean autoCreate) throws Exception {
+ QueueQueryResult queueQueryResult = serverSession.executeQueueQuery(queueName);
if (!queueQueryResult.isExists() && queueQueryResult.isAutoCreateQueues() && autoCreate) {
try {
- serverSession.createQueue(new SimpleString(queueName), new SimpleString(queueName), routingType, null, false, true, true);
+ serverSession.createQueue(queueName, queueName, routingType, null, false, true, true);
} catch (ActiveMQQueueExistsException e) {
// The queue may have been created by another thread in the mean time. Catch and do nothing.
}
- queueQueryResult = serverSession.executeQueueQuery(SimpleString.toSimpleString(queueName));
+ queueQueryResult = serverSession.executeQueueQuery(queueName);
}
// if auto-create we will return whatever type was used before
@@ -287,32 +289,31 @@ public class AMQPSessionCallback implements SessionCallback {
- public boolean bindingQuery(String address, RoutingType routingType) throws Exception {
+ public boolean bindingQuery(SimpleString address, RoutingType routingType) throws Exception {
BindingQueryResult bindingQueryResult = bindingQueryCache.getResult(address);
if (bindingQueryResult != null) {
return bindingQueryResult.isExists();
}
- SimpleString simpleAddress = SimpleString.toSimpleString(address);
- bindingQueryResult = serverSession.executeBindingQuery(simpleAddress);
+ bindingQueryResult = serverSession.executeBindingQuery(address);
if (routingType == RoutingType.MULTICAST && !bindingQueryResult.isExists() && bindingQueryResult.isAutoCreateAddresses()) {
try {
- serverSession.createAddress(simpleAddress, routingType, true);
+ serverSession.createAddress(address, routingType, true);
} catch (ActiveMQAddressExistsException e) {
// The address may have been created by another thread in the mean time. Catch and do nothing.
}
- bindingQueryResult = serverSession.executeBindingQuery(simpleAddress);
+ bindingQueryResult = serverSession.executeBindingQuery(address);
} else if (routingType == RoutingType.ANYCAST && bindingQueryResult.isAutoCreateQueues()) {
- QueueQueryResult queueBinding = serverSession.executeQueueQuery(simpleAddress);
+ QueueQueryResult queueBinding = serverSession.executeQueueQuery(address);
if (!queueBinding.isExists()) {
try {
- serverSession.createQueue(simpleAddress, simpleAddress, routingType, null, false, true, true);
+ serverSession.createQueue(address, address, routingType, null, false, true, true);
} catch (ActiveMQQueueExistsException e) {
// The queue may have been created by another thread in the mean time. Catch and do nothing.
}
}
- bindingQueryResult = serverSession.executeBindingQuery(simpleAddress);
+ bindingQueryResult = serverSession.executeBindingQuery(address);
}
bindingQueryCache.setResult(address, bindingQueryResult);
@@ -320,7 +321,7 @@ public class AMQPSessionCallback implements SessionCallback {
}
- public AddressQueryResult addressQuery(String addressName,
+ public AddressQueryResult addressQuery(SimpleString addressName,
RoutingType routingType,
boolean autoCreate) throws Exception {
@@ -329,15 +330,15 @@ public class AMQPSessionCallback implements SessionCallback {
return addressQueryResult;
}
- addressQueryResult = serverSession.executeAddressQuery(SimpleString.toSimpleString(addressName));
+ addressQueryResult = serverSession.executeAddressQuery(addressName);
if (!addressQueryResult.isExists() && addressQueryResult.isAutoCreateAddresses() && autoCreate) {
try {
- serverSession.createAddress(SimpleString.toSimpleString(addressName), routingType, true);
+ serverSession.createAddress(addressName, routingType, true);
} catch (ActiveMQQueueExistsException e) {
// The queue may have been created by another thread in the mean time. Catch and do nothing.
}
- addressQueryResult = serverSession.executeAddressQuery(SimpleString.toSimpleString(addressName));
+ addressQueryResult = serverSession.executeAddressQuery(addressName);
}
addressQueryCache.setResult(addressName, addressQueryResult);
@@ -438,15 +439,15 @@ public class AMQPSessionCallback implements SessionCallback {
final Transaction transaction,
final Receiver receiver,
final Delivery delivery,
- String address,
+ SimpleString address,
int messageFormat,
byte[] data) throws Exception {
- AMQPMessage message = new AMQPMessage(messageFormat, data);
+ AMQPMessage message = new AMQPMessage(messageFormat, data, coreMessageObjectPools);
if (address != null) {
- message.setAddress(new SimpleString(address));
+ message.setAddress(address);
} else {
// Anonymous relay must set a To value
- address = message.getAddress();
+ address = message.getAddressSimpleString();
if (address == null) {
rejectMessage(delivery, Symbol.valueOf("failed"), "Missing 'to' field for message sent to an anonymous producer");
return;
@@ -552,7 +553,7 @@ public class AMQPSessionCallback implements SessionCallback {
});
}
- public void offerProducerCredit(final String address,
+ public void offerProducerCredit(final SimpleString address,
final int credits,
final int threshold,
final Receiver receiver) {
@@ -567,7 +568,7 @@ public class AMQPSessionCallback implements SessionCallback {
connection.flush();
return;
}
- final PagingStore store = manager.getServer().getPagingManager().getPageStore(new SimpleString(address));
+ final PagingStore store = manager.getServer().getPagingManager().getPageStore(address);
store.checkMemory(new Runnable() {
@Override
public void run() {
@@ -587,8 +588,8 @@ public class AMQPSessionCallback implements SessionCallback {
}
}
- public void deleteQueue(String queueName) throws Exception {
- manager.getServer().destroyQueue(new SimpleString(queueName));
+ public void deleteQueue(SimpleString queueName) throws Exception {
+ manager.getServer().destroyQueue(queueName);
}
public void resetContext(OperationContext oldContext) {
@@ -657,7 +658,7 @@ public class AMQPSessionCallback implements SessionCallback {
}
@Override
- public void disconnect(ServerConsumer consumer, String queueName) {
+ public void disconnect(ServerConsumer consumer, SimpleString queueName) {
ErrorCondition ec = new ErrorCondition(AmqpSupport.RESOURCE_DELETED, "Queue was deleted: " + queueName);
connection.lock();
try {
@@ -703,12 +704,12 @@ public class AMQPSessionCallback implements SessionCallback {
return serverSession.getAddress(address);
}
- public void removeTemporaryQueue(String address) throws Exception {
- serverSession.deleteQueue(SimpleString.toSimpleString(address));
+ public void removeTemporaryQueue(SimpleString address) throws Exception {
+ serverSession.deleteQueue(address);
}
- public RoutingType getDefaultRoutingType(String address) {
- return manager.getServer().getAddressSettingsRepository().getMatch(address).getDefaultAddressRoutingType();
+ public RoutingType getDefaultRoutingType(SimpleString address) {
+ return manager.getServer().getAddressSettingsRepository().getMatch(address.toString()).getDefaultAddressRoutingType();
}
public void check(SimpleString address, CheckType checkType, SecurityAuth session) throws Exception {
@@ -733,10 +734,10 @@ public class AMQPSessionCallback implements SessionCallback {
class AddressQueryCache<T> {
- String address;
+ SimpleString address;
T result;
- public synchronized T getResult(String parameterAddress) {
+ public synchronized T getResult(SimpleString parameterAddress) {
if (address != null && address.equals(parameterAddress)) {
return result;
} else {
@@ -746,7 +747,7 @@ public class AMQPSessionCallback implements SessionCallback {
}
}
- public synchronized void setResult(String parameterAddress, T result) {
+ public synchronized void setResult(SimpleString parameterAddress, T result) {
this.address = parameterAddress;
this.result = result;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPConverter.java
index 724474b..e67fc67 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPConverter.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPConverter.java
@@ -17,6 +17,7 @@
package org.apache.activemq.artemis.protocol.amqp.converter;
import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
@@ -38,7 +39,7 @@ public class AMQPConverter implements MessageConverter<AMQPMessage> {
}
@Override
- public ICoreMessage toCore(AMQPMessage messageSource) throws Exception {
- return AmqpCoreConverter.toCore(messageSource);
+ public ICoreMessage toCore(AMQPMessage messageSource, CoreMessageObjectPools coreMessageObjectPools) throws Exception {
+ return AmqpCoreConverter.toCore(messageSource, coreMessageObjectPools);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
index da2f4e0..1bac1e5 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage;
@@ -242,56 +243,56 @@ public final class AMQPMessageSupport {
return null;
}
- public static ServerJMSBytesMessage createBytesMessage(long id) {
- return new ServerJMSBytesMessage(newMessage(id, BYTES_TYPE));
+ public static ServerJMSBytesMessage createBytesMessage(long id, CoreMessageObjectPools coreMessageObjectPools) {
+ return new ServerJMSBytesMessage(newMessage(id, BYTES_TYPE, coreMessageObjectPools));
}
- public static ServerJMSBytesMessage createBytesMessage(long id, byte[] array, int arrayOffset, int length) throws JMSException {
- ServerJMSBytesMessage message = createBytesMessage(id);
+ public static ServerJMSBytesMessage createBytesMessage(long id, byte[] array, int arrayOffset, int length, CoreMessageObjectPools coreMessageObjectPools) throws JMSException {
+ ServerJMSBytesMessage message = createBytesMessage(id, coreMessageObjectPools);
message.writeBytes(array, arrayOffset, length);
return message;
}
- public static ServerJMSStreamMessage createStreamMessage(long id) {
- return new ServerJMSStreamMessage(newMessage(id, STREAM_TYPE));
+ public static ServerJMSStreamMessage createStreamMessage(long id, CoreMessageObjectPools coreMessageObjectPools) {
+ return new ServerJMSStreamMessage(newMessage(id, STREAM_TYPE, coreMessageObjectPools));
}
- public static ServerJMSMessage createMessage(long id) {
- return new ServerJMSMessage(newMessage(id, DEFAULT_TYPE));
+ public static ServerJMSMessage createMessage(long id, CoreMessageObjectPools coreMessageObjectPools) {
+ return new ServerJMSMessage(newMessage(id, DEFAULT_TYPE, coreMessageObjectPools));
}
- public static ServerJMSTextMessage createTextMessage(long id) {
- return new ServerJMSTextMessage(newMessage(id, TEXT_TYPE));
+ public static ServerJMSTextMessage createTextMessage(long id, CoreMessageObjectPools coreMessageObjectPools) {
+ return new ServerJMSTextMessage(newMessage(id, TEXT_TYPE, coreMessageObjectPools));
}
- public static ServerJMSTextMessage createTextMessage(long id, String text) throws JMSException {
- ServerJMSTextMessage message = createTextMessage(id);
+ public static ServerJMSTextMessage createTextMessage(long id, String text, CoreMessageObjectPools coreMessageObjectPools) throws JMSException {
+ ServerJMSTextMessage message = createTextMessage(id, coreMessageObjectPools);
message.setText(text);
return message;
}
- public static ServerJMSObjectMessage createObjectMessage(long id) {
- return new ServerJMSObjectMessage(newMessage(id, OBJECT_TYPE));
+ public static ServerJMSObjectMessage createObjectMessage(long id, CoreMessageObjectPools coreMessageObjectPools) {
+ return new ServerJMSObjectMessage(newMessage(id, OBJECT_TYPE, coreMessageObjectPools));
}
- public static ServerJMSMessage createObjectMessage(long id, Binary serializedForm) throws JMSException {
- ServerJMSObjectMessage message = createObjectMessage(id);
+ public static ServerJMSMessage createObjectMessage(long id, Binary serializedForm, CoreMessageObjectPools coreMessageObjectPools) throws JMSException {
+ ServerJMSObjectMessage message = createObjectMessage(id, coreMessageObjectPools);
message.setSerializedForm(serializedForm);
return message;
}
- public static ServerJMSMessage createObjectMessage(long id, byte[] array, int offset, int length) throws JMSException {
- ServerJMSObjectMessage message = createObjectMessage(id);
+ public static ServerJMSMessage createObjectMessage(long id, byte[] array, int offset, int length, CoreMessageObjectPools coreMessageObjectPools) throws JMSException {
+ ServerJMSObjectMessage message = createObjectMessage(id, coreMessageObjectPools);
message.setSerializedForm(new Binary(array, offset, length));
return message;
}
- public static ServerJMSMapMessage createMapMessage(long id) {
- return new ServerJMSMapMessage(newMessage(id, MAP_TYPE));
+ public static ServerJMSMapMessage createMapMessage(long id, CoreMessageObjectPools coreMessageObjectPools) {
+ return new ServerJMSMapMessage(newMessage(id, MAP_TYPE, coreMessageObjectPools));
}
- public static ServerJMSMapMessage createMapMessage(long id, Map<String, Object> content) throws JMSException {
- ServerJMSMapMessage message = createMapMessage(id);
+ public static ServerJMSMapMessage createMapMessage(long id, Map<String, Object> content, CoreMessageObjectPools coreMessageObjectPools) throws JMSException {
+ ServerJMSMapMessage message = createMapMessage(id, coreMessageObjectPools);
final Set<Map.Entry<String, Object>> set = content.entrySet();
for (Map.Entry<String, Object> entry : set) {
Object value = entry.getValue();
@@ -304,8 +305,8 @@ public final class AMQPMessageSupport {
return message;
}
- private static CoreMessage newMessage(long id, byte messageType) {
- CoreMessage message = new CoreMessage(id, 512);
+ private static CoreMessage newMessage(long id, byte messageType, CoreMessageObjectPools coreMessageObjectPools) {
+ CoreMessage message = new CoreMessage(id, 512, coreMessageObjectPools);
message.setType(messageType);
// ((ResetLimitWrappedActiveMQBuffer) message.getBodyBuffer()).setMessage(null);
return message;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
index fbaf0ef..80969f6 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
@@ -52,6 +52,7 @@ import javax.jms.JMSException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
@@ -89,31 +90,31 @@ import io.netty.buffer.PooledByteBufAllocator;
public class AmqpCoreConverter {
@SuppressWarnings("unchecked")
- public static ICoreMessage toCore(AMQPMessage message) throws Exception {
+ public static ICoreMessage toCore(AMQPMessage message, CoreMessageObjectPools coreMessageObjectPools) throws Exception {
Section body = message.getProtonMessage().getBody();
ServerJMSMessage result;
if (body == null) {
if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), message.getProtonMessage())) {
- result = createObjectMessage(message.getMessageID());
+ result = createObjectMessage(message.getMessageID(), coreMessageObjectPools);
} else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message.getProtonMessage()) || isContentType(null, message.getProtonMessage())) {
- result = createBytesMessage(message.getMessageID());
+ result = createBytesMessage(message.getMessageID(), coreMessageObjectPools);
} else {
Charset charset = getCharsetForTextualContent(message.getProtonMessage().getContentType());
if (charset != null) {
- result = createTextMessage(message.getMessageID());
+ result = createTextMessage(message.getMessageID(), coreMessageObjectPools);
} else {
- result = createMessage(message.getMessageID());
+ result = createMessage(message.getMessageID(), coreMessageObjectPools);
}
}
} else if (body instanceof Data) {
Binary payload = ((Data) body).getValue();
if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), message.getProtonMessage())) {
- result = createObjectMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength());
+ result = createObjectMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
} else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message.getProtonMessage())) {
- result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength());
+ result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
} else {
Charset charset = getCharsetForTextualContent(message.getProtonMessage().getContentType());
if (StandardCharsets.UTF_8.equals(charset)) {
@@ -121,18 +122,18 @@ public class AmqpCoreConverter {
try {
CharBuffer chars = charset.newDecoder().decode(buf);
- result = createTextMessage(message.getMessageID(), String.valueOf(chars));
+ result = createTextMessage(message.getMessageID(), String.valueOf(chars), coreMessageObjectPools);
} catch (CharacterCodingException e) {
- result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength());
+ result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
}
} else {
- result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength());
+ result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
}
}
} else if (body instanceof AmqpSequence) {
AmqpSequence sequence = (AmqpSequence) body;
- ServerJMSStreamMessage m = createStreamMessage(message.getMessageID());
+ ServerJMSStreamMessage m = createStreamMessage(message.getMessageID(), coreMessageObjectPools);
for (Object item : sequence.getValue()) {
m.writeObject(item);
}
@@ -141,31 +142,31 @@ public class AmqpCoreConverter {
} else if (body instanceof AmqpValue) {
Object value = ((AmqpValue) body).getValue();
if (value == null || value instanceof String) {
- result = createTextMessage(message.getMessageID(), (String) value);
+ result = createTextMessage(message.getMessageID(), (String) value, coreMessageObjectPools);
} else if (value instanceof Binary) {
Binary payload = (Binary) value;
if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), message.getProtonMessage())) {
- result = createObjectMessage(message.getMessageID(), payload);
+ result = createObjectMessage(message.getMessageID(), payload, coreMessageObjectPools);
} else {
- result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength());
+ result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
}
} else if (value instanceof List) {
- ServerJMSStreamMessage m = createStreamMessage(message.getMessageID());
+ ServerJMSStreamMessage m = createStreamMessage(message.getMessageID(), coreMessageObjectPools);
for (Object item : (List<Object>) value) {
m.writeObject(item);
}
result = m;
} else if (value instanceof Map) {
- result = createMapMessage(message.getMessageID(), (Map<String, Object>) value);
+ result = createMapMessage(message.getMessageID(), (Map<String, Object>) value, coreMessageObjectPools);
} else {
ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
try {
TLSEncode.getEncoder().setByteBuffer(new NettyWritable(buf));
TLSEncode.getEncoder().writeObject(body);
- result = createBytesMessage(message.getMessageID(), buf.array(), 0, buf.writerIndex());
+ result = createBytesMessage(message.getMessageID(), buf.array(), 0, buf.writerIndex(), coreMessageObjectPools);
} finally {
buf.release();
TLSEncode.getEncoder().setByteBuffer((WritableBuffer)null);
@@ -186,7 +187,7 @@ public class AmqpCoreConverter {
result.getInnerMessage().setReplyTo(message.getReplyTo());
result.getInnerMessage().setDurable(message.isDurable());
result.getInnerMessage().setPriority(message.getPriority());
- result.getInnerMessage().setAddress(message.getAddress());
+ result.getInnerMessage().setAddress(message.getAddressSimpleString());
result.encode();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index 3e1c0fe..3c35d76 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -54,7 +54,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
protected final Receiver receiver;
- protected String address;
+ protected SimpleString address;
protected final AMQPSessionCallback sessionSPI;
@@ -102,7 +102,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
if (target.getDynamic()) {
// if dynamic we have to create the node (queue) and set the address on the target, the node is temporary and
// will be deleted on closing of the session
- address = sessionSPI.tempQueueName();
+ address = SimpleString.toSimpleString(sessionSPI.tempQueueName());
defRoutingType = getRoutingType(target.getCapabilities(), address);
try {
@@ -113,12 +113,12 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
}
expiryPolicy = target.getExpiryPolicy() != null ? target.getExpiryPolicy() : TerminusExpiryPolicy.LINK_DETACH;
- target.setAddress(address);
+ target.setAddress(address.toString());
} else {
// the target will have an address unless the remote is requesting an anonymous
// relay in which case the address in the incoming message's to field will be
// matched on receive of the message.
- address = target.getAddress();
+ address = SimpleString.toSimpleString(target.getAddress());
if (address != null && !address.isEmpty()) {
defRoutingType = getRoutingType(target.getCapabilities(), address);
@@ -134,7 +134,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
}
try {
- sessionSPI.check(SimpleString.toSimpleString(address), CheckType.SEND, new SecurityAuth() {
+ sessionSPI.check(address, CheckType.SEND, new SecurityAuth() {
@Override
public String getUsername() {
String username = null;
@@ -181,12 +181,12 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
flow(amqpCredits, minCreditRefresh);
}
- public RoutingType getRoutingType(Receiver receiver, String address) {
+ public RoutingType getRoutingType(Receiver receiver, SimpleString address) {
org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget();
return target != null ? getRoutingType(target.getCapabilities(), address) : getRoutingType((Symbol[]) null, address);
}
- private RoutingType getRoutingType(Symbol[] symbols, String address) {
+ private RoutingType getRoutingType(Symbol[] symbols, SimpleString address) {
if (symbols != null) {
for (Symbol symbol : symbols) {
if (AmqpSupport.TEMP_TOPIC_CAPABILITY.equals(symbol) || AmqpSupport.TOPIC_CAPABILITY.equals(symbol)) {
@@ -264,7 +264,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget();
if (target != null && target.getDynamic() && (target.getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH || target.getExpiryPolicy() == TerminusExpiryPolicy.SESSION_END)) {
try {
- sessionSPI.removeTemporaryQueue(target.getAddress());
+ sessionSPI.removeTemporaryQueue(SimpleString.toSimpleString(target.getAddress()));
} catch (Exception e) {
//ignore on close, its temp anyway and will be removed later
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index fbaae8a..1823168 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -102,7 +102,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
private boolean shared = false;
private boolean global = false;
private boolean isVolatile = false;
- private String tempQueueName;
+ private SimpleString tempQueueName;
public ProtonServerSenderContext(AMQPConnectionContext connection,
Sender sender,
@@ -157,7 +157,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
super.initialise();
Source source = (Source) sender.getRemoteSource();
- String queue = null;
+ SimpleString queue = null;
String selector = null;
final Map<Symbol, Object> supportedFilters = new HashMap<>();
@@ -199,7 +199,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
// the lifetime policy and capabilities of the new subscription.
if (result.isExists()) {
source = new org.apache.qpid.proton.amqp.messaging.Source();
- source.setAddress(queue);
+ source.setAddress(queue.toString());
source.setDurable(TerminusDurability.UNSETTLED_STATE);
source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
source.setDistributionMode(COPY);
@@ -240,7 +240,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
} else if (source.getDynamic()) {
// if dynamic we have to create the node (queue) and set the address on the target, the
// node is temporary and will be deleted on closing of the session
- queue = java.util.UUID.randomUUID().toString();
+ queue = SimpleString.toSimpleString(java.util.UUID.randomUUID().toString());
tempQueueName = queue;
try {
sessionSPI.createTemporaryQueue(queue, RoutingType.ANYCAST);
@@ -248,7 +248,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
} catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
}
- source.setAddress(queue);
+ source.setAddress(queue.toString());
} else {
SimpleString addressToUse;
SimpleString queueNameToUse = null;
@@ -269,7 +269,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
multicast = hasCapabilities(TOPIC, source);
AddressQueryResult addressQueryResult = null;
try {
- addressQueryResult = sessionSPI.addressQuery(addressToUse.toString(), multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, true);
+ addressQueryResult = sessionSPI.addressQuery(addressToUse, multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, true);
} catch (ActiveMQSecurityException e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(e.getMessage());
} catch (ActiveMQAMQPException e) {
@@ -294,7 +294,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
// if not we look up the address
AddressQueryResult addressQueryResult = null;
try {
- addressQueryResult = sessionSPI.addressQuery(addressToUse.toString(), defaultRoutingType, true);
+ addressQueryResult = sessionSPI.addressQuery(addressToUse, defaultRoutingType, true);
} catch (ActiveMQSecurityException e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(e.getMessage());
} catch (ActiveMQAMQPException e) {
@@ -333,6 +333,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
}
queue = getMatchingQueue(queueNameToUse, addressToUse, RoutingType.MULTICAST);
+ SimpleString simpleStringSelector = SimpleString.toSimpleString(selector);
//if the address specifies a broker configured queue then we always use this, treat it as a queue
if (queue != null) {
@@ -345,24 +346,23 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
String pubId = sender.getName();
queue = createQueueName(connection.isUseCoreSubscriptionNaming(), clientId, pubId, shared, global, false);
QueueQueryResult result = sessionSPI.queueQuery(queue, routingTypeToUse, false);
-
if (result.isExists()) {
// If a client reattaches to a durable subscription with a different no-local
// filter value, selector or address then we must recreate the queue (JMS semantics).
- if (!Objects.equals(result.getFilterString(), SimpleString.toSimpleString(selector)) || (sender.getSource() != null && !sender.getSource().getAddress().equals(result.getAddress().toString()))) {
+ if (!Objects.equals(result.getFilterString(), simpleStringSelector) || (sender.getSource() != null && !sender.getSource().getAddress().equals(result.getAddress().toString()))) {
if (result.getConsumerCount() == 0) {
sessionSPI.deleteQueue(queue);
- sessionSPI.createUnsharedDurableQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector);
+ sessionSPI.createUnsharedDurableQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector);
} else {
throw new ActiveMQAMQPIllegalStateException("Unable to recreate subscription, consumers already exist");
}
}
} else {
if (shared) {
- sessionSPI.createSharedDurableQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector);
+ sessionSPI.createSharedDurableQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector);
} else {
- sessionSPI.createUnsharedDurableQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector);
+ sessionSPI.createUnsharedDurableQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector);
}
}
} else {
@@ -371,15 +371,15 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
if (shared && sender.getName() != null) {
queue = createQueueName(connection.isUseCoreSubscriptionNaming(), getClientId(), sender.getName(), shared, global, isVolatile);
try {
- sessionSPI.createSharedVolatileQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector);
+ sessionSPI.createSharedVolatileQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector);
} catch (ActiveMQQueueExistsException e) {
//this is ok, just means its shared
}
} else {
- queue = java.util.UUID.randomUUID().toString();
+ queue = SimpleString.toSimpleString(java.util.UUID.randomUUID().toString());
tempQueueName = queue;
try {
- sessionSPI.createTemporaryQueue(source.getAddress(), queue, RoutingType.MULTICAST, selector);
+ sessionSPI.createTemporaryQueue(addressToUse, queue, RoutingType.MULTICAST, simpleStringSelector);
} catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
}
@@ -387,18 +387,18 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
}
} else {
if (queueNameToUse != null) {
- SimpleString matchingAnycastQueue = SimpleString.toSimpleString(getMatchingQueue(queueNameToUse, addressToUse, RoutingType.ANYCAST));
+ SimpleString matchingAnycastQueue = getMatchingQueue(queueNameToUse, addressToUse, RoutingType.ANYCAST);
if (matchingAnycastQueue != null) {
- queue = matchingAnycastQueue.toString();
+ queue = matchingAnycastQueue;
} else {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
}
} else {
SimpleString matchingAnycastQueue = sessionSPI.getMatchingQueue(addressToUse, RoutingType.ANYCAST);
if (matchingAnycastQueue != null) {
- queue = matchingAnycastQueue.toString();
+ queue = matchingAnycastQueue;
} else {
- queue = addressToUse.toString();
+ queue = addressToUse;
}
}
@@ -437,16 +437,16 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
}
}
- private String getMatchingQueue(SimpleString queueName, SimpleString address, RoutingType routingType) throws Exception {
+ private SimpleString getMatchingQueue(SimpleString queueName, SimpleString address, RoutingType routingType) throws Exception {
if (queueName != null) {
- QueueQueryResult result = sessionSPI.queueQuery(queueName.toString(), routingType, false);
+ QueueQueryResult result = sessionSPI.queueQuery(queueName, routingType, false);
if (!result.isExists()) {
throw new ActiveMQAMQPNotFoundException("Queue: '" + queueName + "' does not exist");
} else {
if (!result.getAddress().equals(address)) {
throw new ActiveMQAMQPNotFoundException("Queue: '" + queueName + "' does not exist for address '" + address + "'");
}
- return sessionSPI.getMatchingQueue(address, queueName, routingType).toString();
+ return sessionSPI.getMatchingQueue(address, queueName, routingType);
}
}
return null;
@@ -495,7 +495,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
if (remoteLinkClose) {
Source source = (Source) sender.getSource();
if (source != null && source.getAddress() != null && multicast) {
- String queueName = source.getAddress();
+ SimpleString queueName = SimpleString.toSimpleString(source.getAddress());
QueueQueryResult result = sessionSPI.queueQuery(queueName, routingTypeToUse, false);
if (result.isExists() && source.getDynamic()) {
sessionSPI.deleteQueue(queueName);
@@ -508,7 +508,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
if (pubId.contains("|")) {
pubId = pubId.split("\\|")[0];
}
- String queue = createQueueName(connection.isUseCoreSubscriptionNaming(), clientId, pubId, shared, global, isVolatile);
+ SimpleString queue = createQueueName(connection.isUseCoreSubscriptionNaming(), clientId, pubId, shared, global, isVolatile);
result = sessionSPI.queueQuery(queue, multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, false);
//only delete if it isn't volatile and has no consumers
if (result.isExists() && !isVolatile && result.getConsumerCount() == 0) {
@@ -518,7 +518,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
}
} else if (source != null && source.getDynamic() && (source.getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH || source.getExpiryPolicy() == TerminusExpiryPolicy.SESSION_END)) {
try {
- sessionSPI.removeTemporaryQueue(source.getAddress());
+ sessionSPI.removeTemporaryQueue(SimpleString.toSimpleString(source.getAddress()));
} catch (Exception e) {
//ignore on close, its temp anyway and will be removed later
}
@@ -760,7 +760,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
return false;
}
- private static String createQueueName(boolean useCoreSubscriptionNaming,
+ private static SimpleString createQueueName(boolean useCoreSubscriptionNaming,
String clientId,
String pubId,
boolean shared,
@@ -784,7 +784,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
queue += ":global";
}
}
- return queue;
+ return SimpleString.toSimpleString(queue);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java
index d06464f..ccafd37 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java
@@ -620,7 +620,7 @@ public class JMSMappingOutboundTransformerTest {
}
private ServerJMSObjectMessage createObjectMessage(Serializable payload, boolean compression) {
- ServerJMSObjectMessage result = AMQPMessageSupport.createObjectMessage(0);
+ ServerJMSObjectMessage result = AMQPMessageSupport.createObjectMessage(0, null);
if (compression) {
// TODO
@@ -647,7 +647,7 @@ public class JMSMappingOutboundTransformerTest {
}
private ServerJMSTextMessage createTextMessage(String text, boolean compression) {
- ServerJMSTextMessage result = AMQPMessageSupport.createTextMessage(0);
+ ServerJMSTextMessage result = AMQPMessageSupport.createTextMessage(0, null);
if (compression) {
// TODO
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
index 73dbeaa..da10f47 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
@@ -22,6 +22,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
@@ -56,11 +57,12 @@ public class MQTTSession {
private MQTTProtocolManager protocolManager;
-
private boolean isClean;
private WildcardConfiguration wildcardConfiguration;
+ private CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools();
+
public MQTTSession(MQTTProtocolHandler protocolHandler,
MQTTConnection connection,
MQTTProtocolManager protocolManager,
@@ -195,4 +197,8 @@ public class MQTTSession {
public void setWildcardConfiguration(WildcardConfiguration wildcardConfiguration) {
this.wildcardConfiguration = wildcardConfiguration;
}
+
+ public CoreMessageObjectPools getCoreMessageObjectPools() {
+ return coreMessageObjectPools;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
index 21b1f2b..39e2ba9 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
@@ -78,7 +78,7 @@ public class MQTTSessionCallback implements SessionCallback {
}
@Override
- public void disconnect(ServerConsumer consumer, String queueName) {
+ public void disconnect(ServerConsumer consumer, SimpleString queueName) {
try {
consumer.removeItself();
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
index 2cb1f7e..2667f81 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
@@ -64,13 +64,13 @@ public class MQTTUtil {
public static final String MQTT_RETAIN_ADDRESS_PREFIX = "$sys.mqtt.retain.";
- public static final String MQTT_QOS_LEVEL_KEY = "mqtt.qos.level";
+ public static final SimpleString MQTT_QOS_LEVEL_KEY = SimpleString.toSimpleString("mqtt.qos.level");
- public static final String MQTT_MESSAGE_ID_KEY = "mqtt.message.id";
+ public static final SimpleString MQTT_MESSAGE_ID_KEY = SimpleString.toSimpleString("mqtt.message.id");
- public static final String MQTT_MESSAGE_TYPE_KEY = "mqtt.message.type";
+ public static final SimpleString MQTT_MESSAGE_TYPE_KEY = SimpleString.toSimpleString("mqtt.message.type");
- public static final SimpleString MQTT_MESSAGE_RETAIN_KEY = new SimpleString("mqtt.message.retain");
+ public static final SimpleString MQTT_MESSAGE_RETAIN_KEY = SimpleString.toSimpleString("mqtt.message.retain");
public static final String MANAGEMENT_QUEUE_PREFIX = "$sys.mqtt.queue.qos2.";
@@ -113,10 +113,10 @@ public class MQTTUtil {
int qos) {
long id = session.getServer().getStorageManager().generateID();
- CoreMessage message = new CoreMessage(id, DEFAULT_SERVER_MESSAGE_BUFFER_SIZE);
+ CoreMessage message = new CoreMessage(id, DEFAULT_SERVER_MESSAGE_BUFFER_SIZE, session.getCoreMessageObjectPools());
message.setAddress(address);
message.putBooleanProperty(MQTT_MESSAGE_RETAIN_KEY, retain);
- message.putIntProperty(new SimpleString(MQTT_QOS_LEVEL_KEY), qos);
+ message.putIntProperty(MQTT_QOS_LEVEL_KEY, qos);
message.setType(Message.BYTES_TYPE);
return message;
}
@@ -127,7 +127,8 @@ public class MQTTUtil {
int qos,
ByteBuf payload) {
String coreAddress = convertMQTTAddressFilterToCore(topic, session.getWildcardConfiguration());
- ICoreMessage message = createServerMessage(session, new SimpleString(coreAddress), retain, qos);
+ SimpleString address = SimpleString.toSimpleString(coreAddress, session.getCoreMessageObjectPools().getAddressStringSimpleStringPool());
+ ICoreMessage message = createServerMessage(session, address, retain, qos);
message.getBodyBuffer().writeBytes(payload, 0, payload.readableBytes());
return message;
@@ -135,8 +136,8 @@ public class MQTTUtil {
public static Message createPubRelMessage(MQTTSession session, SimpleString address, int messageId) {
Message message = createServerMessage(session, address, false, 1);
- message.putIntProperty(new SimpleString(MQTTUtil.MQTT_MESSAGE_ID_KEY), messageId);
- message.putIntProperty(new SimpleString(MQTTUtil.MQTT_MESSAGE_TYPE_KEY), MqttMessageType.PUBREL.value());
+ message.putIntProperty(MQTTUtil.MQTT_MESSAGE_ID_KEY, messageId);
+ message.putIntProperty(MQTTUtil.MQTT_MESSAGE_TYPE_KEY, MqttMessageType.PUBREL.value());
return message;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 9923953..86a95db 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -1121,7 +1121,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
@Override
public Response processRemoveSubscription(RemoveSubscriptionInfo subInfo) throws Exception {
- SimpleString subQueueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForSubscription(true, subInfo.getClientId(), subInfo.getSubscriptionName()));
+ SimpleString subQueueName = org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForSubscription(true, subInfo.getClientId(), subInfo.getSubscriptionName());
server.destroyQueue(subQueueName);
return null;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 6af9997..83ff6d6 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -38,6 +38,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
import org.apache.activemq.artemis.core.server.MessageReference;
@@ -108,10 +109,11 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
}
@Override
- public ICoreMessage toCore(OpenwireMessage pureMessage) throws Exception {
+ public ICoreMessage toCore(OpenwireMessage pureMessage, CoreMessageObjectPools coreMessageObjectPools) throws Exception {
return null;
}
+
// @Override
public Object outbound(org.apache.activemq.artemis.api.core.Message message, int deliveryCount) {
// TODO: implement this
@@ -119,10 +121,10 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
}
// @Override
- public org.apache.activemq.artemis.api.core.Message inbound(Object message) throws Exception {
+ public org.apache.activemq.artemis.api.core.Message inbound(Object message, CoreMessageObjectPools coreMessageObjectPools) throws Exception {
Message messageSend = (Message) message;
- CoreMessage coreMessage = new CoreMessage(-1, messageSend.getSize());
+ CoreMessage coreMessage = new CoreMessage(-1, messageSend.getSize(), coreMessageObjectPools);
String type = messageSend.getType();
if (type != null) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
index d28eda4..c63fe19 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
@@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RefCountMessageListener;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.persistence.Persister;
// TODO: Implement this
@@ -442,6 +443,11 @@ public class OpenwireMessage implements Message {
}
@Override
+ public Message putStringProperty(SimpleString key, String value) {
+ return null;
+ }
+
+ @Override
public int getEncodeSize() {
return 0;
}
@@ -478,6 +484,11 @@ public class OpenwireMessage implements Message {
@Override
public ICoreMessage toCore() {
+ return toCore(null);
+ }
+
+ @Override
+ public ICoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) {
return null;
}
[4/6] activemq-artemis git commit: ARTEMIS-1586 Refactor to make more
generic
Posted by mi...@apache.org.
ARTEMIS-1586 Refactor to make more generic
* Move byte util code into ByteUtil
* Re-use the new equals method in SimpleString
* Apply same pools/interners to client decode
* Create String to SimpleString pools/interners for property access via String keys (producer and consumer benefits)
* Lazy init the pools on withing the get methods of CoreMessageObjectPools to get the specific pool, to avoid having this scattered every where.
* reduce SimpleString creation in conversion to/from core message methods with JMS wrapper.
* reduce SimpleString creation in conversion to/from Core in OpenWire, AMQP, MQTT.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/98028cde
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/98028cde
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/98028cde
Branch: refs/heads/master
Commit: 98028cdecc6bd31c86a7d6decfed1961e46be7b2
Parents: 8d776ed
Author: Michael André Pearce <mi...@me.com>
Authored: Wed Jan 10 08:48:14 2018 +0000
Committer: Michael Pearce <mi...@me.com>
Committed: Wed Jan 17 09:33:41 2018 +0100
----------------------------------------------------------------------
.../activemq/artemis/api/core/SimpleString.java | 237 ++++++++-----------
.../artemis/utils/AbstractByteBufPool.java | 164 +++++++++++++
.../artemis/utils/AbstractInterner.java | 157 ------------
.../activemq/artemis/utils/AbstractPool.java | 89 +++++++
.../apache/activemq/artemis/utils/ByteUtil.java | 122 ++++++++++
.../utils/collections/TypedProperties.java | 122 +++++++---
.../activemq/artemis/api/core/Message.java | 6 +
.../core/client/impl/ClientMessageImpl.java | 23 +-
.../core/client/impl/ClientSessionImpl.java | 5 +-
.../artemis/core/message/impl/CoreMessage.java | 107 +++++----
.../message/impl/CoreMessageObjectPools.java | 55 +++++
.../core/protocol/ClientPacketDecoder.java | 11 +-
.../impl/ActiveMQClientProtocolManager.java | 2 +-
.../activemq/artemis/reader/MessageUtil.java | 10 +-
.../artemis/message/CoreMessageTest.java | 2 +-
.../artemis/jms/client/ActiveMQDestination.java | 20 +-
.../artemis/jms/client/ActiveMQMessage.java | 55 ++---
.../artemis/jms/client/ActiveMQQueue.java | 8 +-
.../artemis/jms/client/ActiveMQSession.java | 6 +-
.../jms/client/ActiveMQStreamMessage.java | 2 +-
.../artemis/jms/client/ActiveMQTopic.java | 8 +-
.../protocol/amqp/broker/AMQPMessage.java | 74 ++++--
.../amqp/broker/AMQPSessionCallback.java | 105 ++++----
.../protocol/amqp/converter/AMQPConverter.java | 5 +-
.../amqp/converter/AMQPMessageSupport.java | 49 ++--
.../amqp/converter/AmqpCoreConverter.java | 37 +--
.../proton/ProtonServerReceiverContext.java | 16 +-
.../amqp/proton/ProtonServerSenderContext.java | 54 ++---
.../JMSMappingOutboundTransformerTest.java | 4 +-
.../artemis/core/protocol/mqtt/MQTTSession.java | 8 +-
.../core/protocol/mqtt/MQTTSessionCallback.java | 2 +-
.../artemis/core/protocol/mqtt/MQTTUtil.java | 19 +-
.../protocol/openwire/OpenWireConnection.java | 2 +-
.../openwire/OpenWireMessageConverter.java | 8 +-
.../core/protocol/openwire/OpenwireMessage.java | 11 +
.../core/protocol/openwire/amq/AMQConsumer.java | 2 +-
.../core/protocol/openwire/amq/AMQSession.java | 11 +-
.../core/protocol/stomp/StompSession.java | 2 +-
.../ra/inflow/ActiveMQMessageHandler.java | 2 +-
.../core/protocol/ServerPacketDecoder.java | 25 +-
.../protocol/core/impl/CoreSessionCallback.java | 11 +-
.../core/server/impl/ServerConsumerImpl.java | 2 +-
.../spi/core/protocol/MessageConverter.java | 3 +-
.../spi/core/protocol/SessionCallback.java | 2 +-
.../impl/ScheduledDeliveryHandlerTest.java | 11 +
.../integration/client/AcknowledgeTest.java | 11 +
.../integration/client/HangConsumerTest.java | 2 +-
.../persistence/XmlImportExportTest.java | 2 +-
48 files changed, 1048 insertions(+), 643 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
index e24e245..dbf7468 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
@@ -21,8 +21,9 @@ import java.util.ArrayList;
import java.util.List;
import io.netty.buffer.ByteBuf;
-import io.netty.util.internal.PlatformDependent;
-import org.apache.activemq.artemis.utils.AbstractInterner;
+import org.apache.activemq.artemis.utils.AbstractByteBufPool;
+import org.apache.activemq.artemis.utils.AbstractPool;
+import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.DataConstants;
/**
@@ -33,129 +34,6 @@ import org.apache.activemq.artemis.utils.DataConstants;
*/
public final class SimpleString implements CharSequence, Serializable, Comparable<SimpleString> {
- public static final class Interner extends AbstractInterner<SimpleString> {
-
- private final int maxLength;
-
- public Interner(final int capacity, final int maxCharsLength) {
- super(capacity);
- this.maxLength = maxCharsLength;
- }
-
- @Override
- protected boolean isEqual(final SimpleString entry, final ByteBuf byteBuf, final int offset, final int length) {
- return SimpleString.isEqual(entry, byteBuf, offset, length);
- }
-
- @Override
- protected boolean canIntern(final ByteBuf byteBuf, final int length) {
- assert length % 2 == 0 : "length must be a multiple of 2";
- final int expectedStringLength = length >> 1;
- return expectedStringLength <= maxLength;
- }
-
- @Override
- protected SimpleString create(final ByteBuf byteBuf, final int length) {
- return readSimpleString(byteBuf, length);
- }
- }
-
- /**
- * Returns {@code true} if the {@link SimpleString} encoded content into {@code bytes} is equals to {@code s},
- * {@code false} otherwise.
- * <p>
- * It assumes that the {@code bytes} content is read using {@link SimpleString#readSimpleString(ByteBuf, int)} ie starting right after the
- * length field.
- */
- public static boolean isEqual(final SimpleString s, final ByteBuf bytes, final int offset, final int length) {
- if (s == null) {
- return false;
- }
- final byte[] chars = s.getData();
- if (chars.length != length)
- return false;
- if (PlatformDependent.isUnaligned() && PlatformDependent.hasUnsafe()) {
- if ((offset + length) > bytes.writerIndex()) {
- throw new IndexOutOfBoundsException();
- }
- if (bytes.hasArray()) {
- return batchOnHeapIsEqual(chars, bytes.array(), bytes.arrayOffset() + offset, length);
- } else if (bytes.hasMemoryAddress()) {
- return batchOffHeapIsEqual(chars, bytes.memoryAddress(), offset, length);
- }
- }
- return byteBufIsEqual(chars, bytes, offset, length);
- }
-
- private static boolean byteBufIsEqual(final byte[] chars, final ByteBuf bytes, final int offset, final int length) {
- for (int i = 0; i < length; i++)
- if (chars[i] != bytes.getByte(offset + i))
- return false;
- return true;
- }
-
- private static boolean batchOnHeapIsEqual(final byte[] chars,
- final byte[] array,
- final int arrayOffset,
- final int length) {
- final int longCount = length >>> 3;
- final int bytesCount = length & 7;
- int bytesIndex = arrayOffset;
- int charsIndex = 0;
- for (int i = 0; i < longCount; i++) {
- final long charsLong = PlatformDependent.getLong(chars, charsIndex);
- final long bytesLong = PlatformDependent.getLong(array, bytesIndex);
- if (charsLong != bytesLong) {
- return false;
-
- }
- bytesIndex += 8;
- charsIndex += 8;
- }
- for (int i = 0; i < bytesCount; i++) {
- final byte charsByte = PlatformDependent.getByte(chars, charsIndex);
- final byte bytesByte = PlatformDependent.getByte(array, bytesIndex);
- if (charsByte != bytesByte) {
- return false;
-
- }
- bytesIndex++;
- charsIndex++;
- }
- return true;
- }
-
- private static boolean batchOffHeapIsEqual(final byte[] chars,
- final long address,
- final int offset,
- final int length) {
- final int longCount = length >>> 3;
- final int bytesCount = length & 7;
- long bytesAddress = address + offset;
- int charsIndex = 0;
- for (int i = 0; i < longCount; i++) {
- final long charsLong = PlatformDependent.getLong(chars, charsIndex);
- final long bytesLong = PlatformDependent.getLong(bytesAddress);
- if (charsLong != bytesLong) {
- return false;
-
- }
- bytesAddress += 8;
- charsIndex += 8;
- }
- for (int i = 0; i < bytesCount; i++) {
- final byte charsByte = PlatformDependent.getByte(chars, charsIndex);
- final byte bytesByte = PlatformDependent.getByte(bytesAddress);
- if (charsByte != bytesByte) {
- return false;
-
- }
- bytesAddress++;
- charsIndex++;
- }
- return true;
- }
-
private static final long serialVersionUID = 4204223851422244307L;
// Attributes
@@ -185,6 +63,13 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl
return new SimpleString(string);
}
+ public static SimpleString toSimpleString(final String string, StringSimpleStringPool pool) {
+ if (pool == null) {
+ return toSimpleString(string);
+ }
+ return pool.getOrCreate(string);
+ }
+
// Constructors
// ----------------------------------------------------------------------
@@ -236,6 +121,10 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl
data[1] = high;
}
+ public boolean isEmpty() {
+ return data.length == 0;
+ }
+
// CharSequence implementation
// ---------------------------------------------------------------------------
@@ -267,11 +156,26 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl
return readSimpleString(buffer);
}
+ public static SimpleString readNullableSimpleString(ByteBuf buffer, ByteBufSimpleStringPool pool) {
+ int b = buffer.readByte();
+ if (b == DataConstants.NULL) {
+ return null;
+ }
+ return readSimpleString(buffer, pool);
+ }
+
public static SimpleString readSimpleString(ByteBuf buffer) {
int len = buffer.readInt();
return readSimpleString(buffer, len);
}
+ public static SimpleString readSimpleString(ByteBuf buffer, ByteBufSimpleStringPool pool) {
+ if (pool == null) {
+ return readSimpleString(buffer);
+ }
+ return pool.getOrCreate(buffer);
+ }
+
public static SimpleString readSimpleString(final ByteBuf buffer, final int length) {
byte[] data = new byte[length];
buffer.readBytes(data);
@@ -381,22 +285,23 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl
if (other instanceof SimpleString) {
SimpleString s = (SimpleString) other;
- if (data.length != s.data.length) {
- return false;
- }
-
- for (int i = 0; i < data.length; i++) {
- if (data[i] != s.data[i]) {
- return false;
- }
- }
-
- return true;
+ return ByteUtil.equals(data, s.data);
} else {
return false;
}
}
+ /**
+ * Returns {@code true} if the {@link SimpleString} encoded content into {@code bytes} is equals to {@code s},
+ * {@code false} otherwise.
+ * <p>
+ * It assumes that the {@code bytes} content is read using {@link SimpleString#readSimpleString(ByteBuf, int)} ie starting right after the
+ * length field.
+ */
+ public boolean equals(final ByteBuf byteBuf, final int offset, final int length) {
+ return ByteUtil.equals(data, byteBuf, offset, length);
+ }
+
@Override
public int hashCode() {
if (hash == 0) {
@@ -575,4 +480,64 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl
dst[d++] = (char) (low | high);
}
}
+
+ public static final class ByteBufSimpleStringPool extends AbstractByteBufPool<SimpleString> {
+
+ private static final int UUID_LENGTH = 36;
+
+ private final int maxLength;
+
+ public ByteBufSimpleStringPool() {
+ this.maxLength = UUID_LENGTH;
+ }
+
+ public ByteBufSimpleStringPool(final int capacity, final int maxCharsLength) {
+ super(capacity);
+ this.maxLength = maxCharsLength;
+ }
+
+ @Override
+ protected boolean isEqual(final SimpleString entry, final ByteBuf byteBuf, final int offset, final int length) {
+ if (entry == null) {
+ return false;
+ }
+ return entry.equals(byteBuf, offset, length);
+ }
+
+ @Override
+ protected boolean canPool(final ByteBuf byteBuf, final int length) {
+ assert length % 2 == 0 : "length must be a multiple of 2";
+ final int expectedStringLength = length >> 1;
+ return expectedStringLength <= maxLength;
+ }
+
+ @Override
+ protected SimpleString create(final ByteBuf byteBuf, final int length) {
+ return readSimpleString(byteBuf, length);
+ }
+ }
+
+ public static final class StringSimpleStringPool extends AbstractPool<String, SimpleString> {
+
+ public StringSimpleStringPool() {
+ super();
+ }
+
+ public StringSimpleStringPool(final int capacity) {
+ super(capacity);
+ }
+
+ @Override
+ protected SimpleString create(String value) {
+ return toSimpleString(value);
+ }
+
+ @Override
+ protected boolean isEqual(SimpleString entry, String value) {
+ if (entry == null) {
+ return false;
+ }
+ return entry.toString().equals(value);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractByteBufPool.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractByteBufPool.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractByteBufPool.java
new file mode 100644
index 0000000..87c1b6f
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractByteBufPool.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.internal.MathUtil;
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * Thread-safe {@code <T>} interner.
+ * <p>
+ * Differently from {@link String#intern()} it contains a fixed amount of entries and
+ * when used by concurrent threads it doesn't ensure the uniqueness of the entries ie
+ * the same entry could be allocated multiple times by concurrent calls.
+ */
+public abstract class AbstractByteBufPool<T> {
+
+ public static final int DEFAULT_POOL_CAPACITY = 32;
+
+ private final T[] entries;
+ private final int mask;
+ private final int shift;
+
+ public AbstractByteBufPool() {
+ this(DEFAULT_POOL_CAPACITY);
+ }
+
+ public AbstractByteBufPool(final int capacity) {
+ entries = (T[]) new Object[MathUtil.findNextPositivePowerOfTwo(capacity)];
+ mask = entries.length - 1;
+ //log2 of entries.length
+ shift = 31 - Integer.numberOfLeadingZeros(entries.length);
+ }
+
+ /**
+ * Batch hash code implementation that works at its best if {@code bytes}
+ * contains a {@link org.apache.activemq.artemis.api.core.SimpleString} encoded.
+ */
+ private static int hashCode(final ByteBuf bytes, final int offset, final int length) {
+ if (PlatformDependent.isUnaligned() && PlatformDependent.hasUnsafe()) {
+ //if the platform allows it, the hash code could be computed without bounds checking
+ if (bytes.hasArray()) {
+ return onHeapHashCode(bytes.array(), bytes.arrayOffset() + offset, length);
+ } else if (bytes.hasMemoryAddress()) {
+ return offHeapHashCode(bytes.memoryAddress(), offset, length);
+ }
+ }
+ return byteBufHashCode(bytes, offset, length);
+ }
+
+ private static int onHeapHashCode(final byte[] bytes, final int offset, final int length) {
+ final int intCount = length >>> 1;
+ final int byteCount = length & 1;
+ int hashCode = 1;
+ int arrayIndex = offset;
+ for (int i = 0; i < intCount; i++) {
+ hashCode = 31 * hashCode + PlatformDependent.getShort(bytes, arrayIndex);
+ arrayIndex += 2;
+ }
+ for (int i = 0; i < byteCount; i++) {
+ hashCode = 31 * hashCode + PlatformDependent.getByte(bytes, arrayIndex++);
+ }
+ return hashCode;
+ }
+
+ private static int offHeapHashCode(final long address, final int offset, final int length) {
+ final int intCount = length >>> 1;
+ final int byteCount = length & 1;
+ int hashCode = 1;
+ int arrayIndex = offset;
+ for (int i = 0; i < intCount; i++) {
+ hashCode = 31 * hashCode + PlatformDependent.getShort(address + arrayIndex);
+ arrayIndex += 2;
+ }
+ for (int i = 0; i < byteCount; i++) {
+ hashCode = 31 * hashCode + PlatformDependent.getByte(address + arrayIndex++);
+ }
+ return hashCode;
+ }
+
+ private static int byteBufHashCode(final ByteBuf byteBuf, final int offset, final int length) {
+ final int intCount = length >>> 1;
+ final int byteCount = length & 1;
+ int hashCode = 1;
+ int arrayIndex = offset;
+ for (int i = 0; i < intCount; i++) {
+ final short shortLE = byteBuf.getShortLE(arrayIndex);
+ final short nativeShort = PlatformDependent.BIG_ENDIAN_NATIVE_ORDER ? Short.reverseBytes(shortLE) : shortLE;
+ hashCode = 31 * hashCode + nativeShort;
+ arrayIndex += 2;
+ }
+ for (int i = 0; i < byteCount; i++) {
+ hashCode = 31 * hashCode + byteBuf.getByte(arrayIndex++);
+ }
+ return hashCode;
+ }
+
+ /**
+ * Returns {@code true} if {@code length}'s {@code byteBuf} content from {@link ByteBuf#readerIndex()} can be pooled,
+ * {@code false} otherwise.
+ */
+ protected abstract boolean canPool(ByteBuf byteBuf, int length);
+
+ /**
+ * Create a new entry.
+ */
+ protected abstract T create(ByteBuf byteBuf, int length);
+
+ /**
+ * Returns {@code true} if the {@code entry} content is the same of {@code byteBuf} at the specified {@code offset}
+ * and {@code length} {@code false} otherwise.
+ */
+ protected abstract boolean isEqual(T entry, ByteBuf byteBuf, int offset, int length);
+
+ /**
+ * Returns a pooled entry if possible, a new one otherwise.
+ * <p>
+ * The {@code byteBuf}'s {@link ByteBuf#readerIndex()} is incremented by {@code length} after it.
+ */
+ public final T getOrCreate(final ByteBuf byteBuf) {
+ final int length = byteBuf.readInt();
+ if (!canPool(byteBuf, length)) {
+ return create(byteBuf, length);
+ } else {
+ if (!byteBuf.isReadable(length)) {
+ throw new IndexOutOfBoundsException();
+ }
+ final int bytesOffset = byteBuf.readerIndex();
+ final int hashCode = hashCode(byteBuf, bytesOffset, length);
+ //fast % operation with power of 2 entries.length
+ final int firstIndex = hashCode & mask;
+ final T firstEntry = entries[firstIndex];
+ if (isEqual(firstEntry, byteBuf, bytesOffset, length)) {
+ byteBuf.skipBytes(length);
+ return firstEntry;
+ }
+ final int secondIndex = (hashCode >> shift) & mask;
+ final T secondEntry = entries[secondIndex];
+ if (isEqual(secondEntry, byteBuf, bytesOffset, length)) {
+ byteBuf.skipBytes(length);
+ return secondEntry;
+ }
+ final T internedEntry = create(byteBuf, length);
+ final int entryIndex = firstEntry == null ? firstIndex : secondIndex;
+ entries[entryIndex] = internedEntry;
+ return internedEntry;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractInterner.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractInterner.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractInterner.java
deleted file mode 100644
index 7e1fe40..0000000
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractInterner.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.artemis.utils;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.util.internal.MathUtil;
-import io.netty.util.internal.PlatformDependent;
-
-/**
- * Thread-safe {@code <T>} interner.
- * <p>
- * Differently from {@link String#intern()} it contains a fixed amount of entries and
- * when used by concurrent threads it doesn't ensure the uniqueness of the entries ie
- * the same entry could be allocated multiple times by concurrent calls.
- */
-public abstract class AbstractInterner<T> {
-
- private final T[] entries;
- private final int mask;
- private final int shift;
-
- public AbstractInterner(final int capacity) {
- entries = (T[]) new Object[MathUtil.findNextPositivePowerOfTwo(capacity)];
- mask = entries.length - 1;
- //log2 of entries.length
- shift = 31 - Integer.numberOfLeadingZeros(entries.length);
- }
-
- /**
- * Batch hash code implementation that works at its best if {@code bytes}
- * contains a {@link org.apache.activemq.artemis.api.core.SimpleString} encoded.
- */
- private static int hashCode(final ByteBuf bytes, final int offset, final int length) {
- if (PlatformDependent.isUnaligned() && PlatformDependent.hasUnsafe()) {
- //if the platform allows it, the hash code could be computed without bounds checking
- if (bytes.hasArray()) {
- return onHeapHashCode(bytes.array(), bytes.arrayOffset() + offset, length);
- } else if (bytes.hasMemoryAddress()) {
- return offHeapHashCode(bytes.memoryAddress(), offset, length);
- }
- }
- return byteBufHashCode(bytes, offset, length);
- }
-
- private static int onHeapHashCode(final byte[] bytes, final int offset, final int length) {
- final int intCount = length >>> 1;
- final int byteCount = length & 1;
- int hashCode = 1;
- int arrayIndex = offset;
- for (int i = 0; i < intCount; i++) {
- hashCode = 31 * hashCode + PlatformDependent.getShort(bytes, arrayIndex);
- arrayIndex += 2;
- }
- for (int i = 0; i < byteCount; i++) {
- hashCode = 31 * hashCode + PlatformDependent.getByte(bytes, arrayIndex++);
- }
- return hashCode;
- }
-
- private static int offHeapHashCode(final long address, final int offset, final int length) {
- final int intCount = length >>> 1;
- final int byteCount = length & 1;
- int hashCode = 1;
- int arrayIndex = offset;
- for (int i = 0; i < intCount; i++) {
- hashCode = 31 * hashCode + PlatformDependent.getShort(address + arrayIndex);
- arrayIndex += 2;
- }
- for (int i = 0; i < byteCount; i++) {
- hashCode = 31 * hashCode + PlatformDependent.getByte(address + arrayIndex++);
- }
- return hashCode;
- }
-
- private static int byteBufHashCode(final ByteBuf byteBuf, final int offset, final int length) {
- final int intCount = length >>> 1;
- final int byteCount = length & 1;
- int hashCode = 1;
- int arrayIndex = offset;
- for (int i = 0; i < intCount; i++) {
- final short shortLE = byteBuf.getShortLE(arrayIndex);
- final short nativeShort = PlatformDependent.BIG_ENDIAN_NATIVE_ORDER ? Short.reverseBytes(shortLE) : shortLE;
- hashCode = 31 * hashCode + nativeShort;
- arrayIndex += 2;
- }
- for (int i = 0; i < byteCount; i++) {
- hashCode = 31 * hashCode + byteBuf.getByte(arrayIndex++);
- }
- return hashCode;
- }
-
- /**
- * Returns {@code true} if {@code length}'s {@code byteBuf} content from {@link ByteBuf#readerIndex()} can be interned,
- * {@code false} otherwise.
- */
- protected abstract boolean canIntern(ByteBuf byteBuf, int length);
-
- /**
- * Create a new entry.
- */
- protected abstract T create(ByteBuf byteBuf, int length);
-
- /**
- * Returns {@code true} if the {@code entry} content is the same of {@code byteBuf} at the specified {@code offset}
- * and {@code length} {@code false} otherwise.
- */
- protected abstract boolean isEqual(T entry, ByteBuf byteBuf, int offset, int length);
-
- /**
- * Returns and interned entry if possible, a new one otherwise.
- * <p>
- * The {@code byteBuf}'s {@link ByteBuf#readerIndex()} is incremented by {@code length} after it.
- */
- public final T intern(final ByteBuf byteBuf, final int length) {
- if (!canIntern(byteBuf, length)) {
- return create(byteBuf, length);
- } else {
- if (!byteBuf.isReadable(length)) {
- throw new IndexOutOfBoundsException();
- }
- final int bytesOffset = byteBuf.readerIndex();
- final int hashCode = hashCode(byteBuf, bytesOffset, length);
- //fast % operation with power of 2 entries.length
- final int firstIndex = hashCode & mask;
- final T firstEntry = entries[firstIndex];
- if (isEqual(firstEntry, byteBuf, bytesOffset, length)) {
- byteBuf.skipBytes(length);
- return firstEntry;
- }
- final int secondIndex = (hashCode >> shift) & mask;
- final T secondEntry = entries[secondIndex];
- if (isEqual(secondEntry, byteBuf, bytesOffset, length)) {
- byteBuf.skipBytes(length);
- return secondEntry;
- }
- final T internedEntry = create(byteBuf, length);
- final int entryIndex = firstEntry == null ? firstIndex : secondIndex;
- entries[entryIndex] = internedEntry;
- return internedEntry;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractPool.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractPool.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractPool.java
new file mode 100644
index 0000000..cc42e8f
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractPool.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.internal.MathUtil;
+
+/**
+ * Thread-safe {@code <T>} interner.
+ * <p>
+ * Differently from {@link String#intern()} it contains a fixed amount of entries and
+ * when used by concurrent threads it doesn't ensure the uniqueness of the entries ie
+ * the same entry could be allocated multiple times by concurrent calls.
+ */
+public abstract class AbstractPool<I, O> {
+
+ public static final int DEFAULT_POOL_CAPACITY = 32;
+
+ private final O[] entries;
+ private final int mask;
+ private final int shift;
+
+ public AbstractPool() {
+ this(DEFAULT_POOL_CAPACITY);
+ }
+
+ public AbstractPool(final int capacity) {
+ entries = (O[]) new Object[MathUtil.findNextPositivePowerOfTwo(capacity)];
+ mask = entries.length - 1;
+ //log2 of entries.length
+ shift = 31 - Integer.numberOfLeadingZeros(entries.length);
+ }
+
+ /**
+ * Create a new entry.
+ */
+ protected abstract O create(I value);
+
+ /**
+ * Returns {@code true} if the {@code entry} content is equal to {@code value};
+ */
+ protected abstract boolean isEqual(O entry, I value);
+
+ protected int hashCode(I value) {
+ return value.hashCode();
+ }
+
+ /**
+ * Returns and interned entry if possible, a new one otherwise.
+ * <p>
+ * The {@code byteBuf}'s {@link ByteBuf#readerIndex()} is incremented by {@code length} after it.
+ */
+ public final O getOrCreate(final I value) {
+ if (value == null) {
+ return null;
+ }
+ final int hashCode = hashCode(value);
+ //fast % operation with power of 2 entries.length
+ final int firstIndex = hashCode & mask;
+ final O firstEntry = entries[firstIndex];
+ if (isEqual(firstEntry, value)) {
+ return firstEntry;
+ }
+ final int secondIndex = (hashCode >> shift) & mask;
+ final O secondEntry = entries[secondIndex];
+ if (isEqual(secondEntry, value)) {
+ return secondEntry;
+ }
+ final O internedEntry = create(value);
+ final int entryIndex = firstEntry == null ? firstIndex : secondIndex;
+ entries[entryIndex] = internedEntry;
+ return internedEntry;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
index e70891d..8835797 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
@@ -22,6 +22,7 @@ import java.util.regex.Pattern;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.util.internal.PlatformDependent;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.logs.ActiveMQUtilBundle;
@@ -207,4 +208,125 @@ public class ByteUtil {
throw ActiveMQUtilBundle.BUNDLE.failedToParseLong(text);
}
}
+
+ public static boolean equals(final byte[] left, final byte[] right) {
+ return equals(left, right, 0, right.length);
+ }
+
+ public static boolean equals(final byte[] left,
+ final byte[] right,
+ final int rightOffset,
+ final int rightLength) {
+ if (left == right)
+ return true;
+ if (left == null || right == null)
+ return false;
+ if (left.length != rightLength)
+ return false;
+ if (PlatformDependent.isUnaligned() && PlatformDependent.hasUnsafe()) {
+ return equalsUnsafe(left, right, rightOffset, rightLength);
+ } else {
+ return equalsSafe(left, right, rightOffset, rightLength);
+ }
+ }
+
+ private static boolean equalsSafe(byte[] left, byte[] right, int rightOffset, int rightLength) {
+ for (int i = 0; i < rightLength; i++)
+ if (left[i] != right[rightOffset + i])
+ return false;
+ return true;
+ }
+
+ private static boolean equalsUnsafe(final byte[] left,
+ final byte[] right,
+ final int rightOffset,
+ final int rightLength) {
+ final int longCount = rightLength >>> 3;
+ final int bytesCount = rightLength & 7;
+ int bytesIndex = rightOffset;
+ int charsIndex = 0;
+ for (int i = 0; i < longCount; i++) {
+ final long charsLong = PlatformDependent.getLong(left, charsIndex);
+ final long bytesLong = PlatformDependent.getLong(right, bytesIndex);
+ if (charsLong != bytesLong) {
+ return false;
+ }
+ bytesIndex += 8;
+ charsIndex += 8;
+ }
+ for (int i = 0; i < bytesCount; i++) {
+ final byte charsByte = PlatformDependent.getByte(left, charsIndex);
+ final byte bytesByte = PlatformDependent.getByte(right, bytesIndex);
+ if (charsByte != bytesByte) {
+ return false;
+ }
+ bytesIndex++;
+ charsIndex++;
+ }
+ return true;
+ }
+
+
+ /**
+ * Returns {@code true} if the {@link SimpleString} encoded content into {@code bytes} is equals to {@code s},
+ * {@code false} otherwise.
+ * <p>
+ * It assumes that the {@code bytes} content is read using {@link SimpleString#readSimpleString(ByteBuf, int)} ie starting right after the
+ * length field.
+ */
+ public static boolean equals(final byte[] bytes, final ByteBuf byteBuf, final int offset, final int length) {
+ if (bytes.length != length)
+ return false;
+ if (PlatformDependent.isUnaligned() && PlatformDependent.hasUnsafe()) {
+ if ((offset + length) > byteBuf.writerIndex()) {
+ throw new IndexOutOfBoundsException();
+ }
+ if (byteBuf.hasArray()) {
+ return equals(bytes, byteBuf.array(), byteBuf.arrayOffset() + offset, length);
+ } else if (byteBuf.hasMemoryAddress()) {
+ return equalsOffHeap(bytes, byteBuf.memoryAddress(), offset, length);
+ }
+ }
+ return equalsOnHeap(bytes, byteBuf, offset, length);
+ }
+
+ private static boolean equalsOnHeap(final byte[] bytes, final ByteBuf byteBuf, final int offset, final int length) {
+ if (bytes.length != length)
+ return false;
+ for (int i = 0; i < length; i++)
+ if (bytes[i] != byteBuf.getByte(offset + i))
+ return false;
+ return true;
+ }
+
+ private static boolean equalsOffHeap(final byte[] bytes,
+ final long address,
+ final int offset,
+ final int length) {
+ final int longCount = length >>> 3;
+ final int bytesCount = length & 7;
+ long bytesAddress = address + offset;
+ int charsIndex = 0;
+ for (int i = 0; i < longCount; i++) {
+ final long charsLong = PlatformDependent.getLong(bytes, charsIndex);
+ final long bytesLong = PlatformDependent.getLong(bytesAddress);
+ if (charsLong != bytesLong) {
+ return false;
+
+ }
+ bytesAddress += 8;
+ charsIndex += 8;
+ }
+ for (int i = 0; i < bytesCount; i++) {
+ final byte charsByte = PlatformDependent.getByte(bytes, charsIndex);
+ final byte bytesByte = PlatformDependent.getByte(bytesAddress);
+ if (charsByte != bytesByte) {
+ return false;
+
+ }
+ bytesAddress++;
+ charsIndex++;
+ }
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
index a3e4876..56beb76 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
@@ -28,7 +28,7 @@ import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.logs.ActiveMQUtilBundle;
-import org.apache.activemq.artemis.utils.AbstractInterner;
+import org.apache.activemq.artemis.utils.AbstractByteBufPool;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.DataConstants;
@@ -332,8 +332,7 @@ public class TypedProperties {
}
public synchronized void decode(final ByteBuf buffer,
- final SimpleString.Interner keyInterner,
- final StringValue.Interner valueInterner) {
+ final TypedPropertiesDecoderPools keyValuePools) {
byte b = buffer.readByte();
if (b == DataConstants.NULL) {
@@ -346,15 +345,7 @@ public class TypedProperties {
size = 0;
for (int i = 0; i < numHeaders; i++) {
- final SimpleString key;
- int len = buffer.readInt();
- if (keyInterner != null) {
- key = keyInterner.intern(buffer, len);
- } else {
- byte[] data = new byte[len];
- buffer.readBytes(data);
- key = new SimpleString(data);
- }
+ final SimpleString key = SimpleString.readSimpleString(buffer, keyValuePools == null ? null : keyValuePools.getPropertyKeysPool());
byte type = buffer.readByte();
@@ -412,12 +403,7 @@ public class TypedProperties {
break;
}
case STRING: {
- if (valueInterner != null) {
- final int length = buffer.readInt();
- val = valueInterner.intern(buffer, length);
- } else {
- val = new StringValue(buffer);
- }
+ val = StringValue.readStringValue(buffer, keyValuePools == null ? null : keyValuePools.getPropertyValuesPool());
doPutValue(key, val);
break;
}
@@ -430,7 +416,7 @@ public class TypedProperties {
}
public synchronized void decode(final ByteBuf buffer) {
- decode(buffer, null, null);
+ decode(buffer, null);
}
public synchronized void encode(final ByteBuf buffer) {
@@ -901,25 +887,61 @@ public class TypedProperties {
public static final class StringValue extends PropertyValue {
- public static final class Interner extends AbstractInterner<StringValue> {
+ final SimpleString val;
+
+ private StringValue(final SimpleString val) {
+ this.val = val;
+ }
+
+ static StringValue readStringValue(final ByteBuf byteBuf, ByteBufStringValuePool pool) {
+ if (pool == null) {
+ return new StringValue(SimpleString.readSimpleString(byteBuf));
+ } else {
+ return pool.getOrCreate(byteBuf);
+ }
+ }
+
+ @Override
+ public Object getValue() {
+ return val;
+ }
+
+ @Override
+ public void write(final ByteBuf buffer) {
+ buffer.writeByte(DataConstants.STRING);
+ SimpleString.writeSimpleString(buffer, val);
+ }
+
+ @Override
+ public int encodeSize() {
+ return DataConstants.SIZE_BYTE + SimpleString.sizeofString(val);
+ }
+
+ public static final class ByteBufStringValuePool extends AbstractByteBufPool<StringValue> {
+
+ private static final int UUID_LENGTH = 36;
private final int maxLength;
- public Interner(final int capacity, final int maxCharsLength) {
+ public ByteBufStringValuePool() {
+ this.maxLength = UUID_LENGTH;
+ }
+
+ public ByteBufStringValuePool(final int capacity, final int maxCharsLength) {
super(capacity);
this.maxLength = maxCharsLength;
}
@Override
protected boolean isEqual(final StringValue entry, final ByteBuf byteBuf, final int offset, final int length) {
- if (entry == null) {
+ if (entry == null || entry.val == null) {
return false;
}
- return SimpleString.isEqual(entry.val, byteBuf, offset, length);
+ return entry.val.equals(byteBuf, offset, length);
}
@Override
- protected boolean canIntern(final ByteBuf byteBuf, final int length) {
+ protected boolean canPool(final ByteBuf byteBuf, final int length) {
assert length % 2 == 0 : "length must be a multiple of 2";
final int expectedStringLength = length >> 1;
return expectedStringLength <= maxLength;
@@ -930,31 +952,53 @@ public class TypedProperties {
return new StringValue(SimpleString.readSimpleString(byteBuf, length));
}
}
+ }
- final SimpleString val;
+ public static class TypedPropertiesDecoderPools {
- private StringValue(final SimpleString val) {
- this.val = val;
+ private SimpleString.ByteBufSimpleStringPool propertyKeysPool;
+ private TypedProperties.StringValue.ByteBufStringValuePool propertyValuesPool;
+
+ public TypedPropertiesDecoderPools() {
+ this.propertyKeysPool = new SimpleString.ByteBufSimpleStringPool();
+ this.propertyValuesPool = new TypedProperties.StringValue.ByteBufStringValuePool();
}
- private StringValue(final ByteBuf buffer) {
- val = SimpleString.readSimpleString(buffer);
+ public TypedPropertiesDecoderPools(int keyPoolCapacity, int valuePoolCapacity, int maxCharsLength) {
+ this.propertyKeysPool = new SimpleString.ByteBufSimpleStringPool(keyPoolCapacity, maxCharsLength);
+ this.propertyValuesPool = new TypedProperties.StringValue.ByteBufStringValuePool(valuePoolCapacity, maxCharsLength);
}
- @Override
- public Object getValue() {
- return val;
+ public SimpleString.ByteBufSimpleStringPool getPropertyKeysPool() {
+ return propertyKeysPool;
}
- @Override
- public void write(final ByteBuf buffer) {
- buffer.writeByte(DataConstants.STRING);
- SimpleString.writeSimpleString(buffer, val);
+ public TypedProperties.StringValue.ByteBufStringValuePool getPropertyValuesPool() {
+ return propertyValuesPool;
}
+ }
- @Override
- public int encodeSize() {
- return DataConstants.SIZE_BYTE + SimpleString.sizeofString(val);
+ public static class TypedPropertiesStringSimpleStringPools {
+
+ private SimpleString.StringSimpleStringPool propertyKeysPool;
+ private SimpleString.StringSimpleStringPool propertyValuesPool;
+
+ public TypedPropertiesStringSimpleStringPools() {
+ this.propertyKeysPool = new SimpleString.StringSimpleStringPool();
+ this.propertyValuesPool = new SimpleString.StringSimpleStringPool();
+ }
+
+ public TypedPropertiesStringSimpleStringPools(int keyPoolCapacity, int valuePoolCapacity) {
+ this.propertyKeysPool = new SimpleString.StringSimpleStringPool(keyPoolCapacity);
+ this.propertyValuesPool = new SimpleString.StringSimpleStringPool(valuePoolCapacity);
+ }
+
+ public SimpleString.StringSimpleStringPool getPropertyKeysPool() {
+ return propertyKeysPool;
+ }
+
+ public SimpleString.StringSimpleStringPool getPropertyValuesPool() {
+ return propertyValuesPool;
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index ddb8a3b..d24cd95 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -22,6 +22,7 @@ import java.util.Map;
import java.util.Set;
import io.netty.buffer.ByteBuf;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.persistence.Persister;
/**
@@ -587,6 +588,8 @@ public interface Message {
Message putStringProperty(SimpleString key, SimpleString value);
+ Message putStringProperty(SimpleString key, String value);
+
/**
* Returns the size of the <em>encoded</em> message.
*/
@@ -649,6 +652,9 @@ public interface Message {
/** This should make you convert your message into Core format. */
ICoreMessage toCore();
+ /** This should make you convert your message into Core format. */
+ ICoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools);
+
int getMemoryEstimate();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
index 91fb6ca..8068aa9 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
@@ -30,6 +30,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
@@ -59,6 +60,10 @@ public class ClientMessageImpl extends CoreMessage implements ClientMessageInter
public ClientMessageImpl() {
}
+ public ClientMessageImpl(CoreMessageObjectPools coreMessageObjectPools) {
+ super(coreMessageObjectPools);
+ }
+
protected ClientMessageImpl(ClientMessageImpl other) {
super(other);
}
@@ -96,11 +101,22 @@ public class ClientMessageImpl extends CoreMessage implements ClientMessageInter
final long expiration,
final long timestamp,
final byte priority,
- final int initialMessageBufferSize) {
+ final int initialMessageBufferSize,
+ final CoreMessageObjectPools coreMessageObjectPools) {
+ super(coreMessageObjectPools);
this.setType(type).setExpiration(expiration).setTimestamp(timestamp).setDurable(durable).
setPriority(priority).initBuffer(initialMessageBufferSize);
}
+ public ClientMessageImpl(final byte type,
+ final boolean durable,
+ final long expiration,
+ final long timestamp,
+ final byte priority,
+ final int initialMessageBufferSize) {
+ this(type, durable, expiration, timestamp, priority, initialMessageBufferSize, null);
+ }
+
@Override
public TypedProperties getProperties() {
return this.checkProperties();
@@ -286,6 +302,11 @@ public class ClientMessageImpl extends CoreMessage implements ClientMessageInter
}
@Override
+ public ClientMessageImpl putStringProperty(final SimpleString key, final String value) {
+ return (ClientMessageImpl) super.putStringProperty(key, value);
+ }
+
+ @Override
public ClientMessageImpl putObjectProperty(final SimpleString key,
final Object value) throws ActiveMQPropertyConversionException {
return (ClientMessageImpl) super.putObjectProperty(key, value);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
index 61784ad..b5f8a1b 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
@@ -43,6 +43,7 @@ import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@@ -148,6 +149,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
private final Executor closeExecutor;
+ private final CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools();
+
ClientSessionImpl(final ClientSessionFactoryInternal sessionFactory,
final String name,
final String username,
@@ -869,7 +872,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
final long expiration,
final long timestamp,
final byte priority) {
- return new ClientMessageImpl(type, durable, expiration, timestamp, priority, initialMessagePacketSize);
+ return new ClientMessageImpl(type, durable, expiration, timestamp, priority, initialMessagePacketSize, coreMessageObjectPools);
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index 4ebf97e..888b785 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -93,18 +93,14 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
protected volatile TypedProperties properties;
- private final SimpleString.Interner keysInterner;
- private final TypedProperties.StringValue.Interner valuesInterner;
+ private final CoreMessageObjectPools coreMessageObjectPools;
- public CoreMessage(final SimpleString.Interner keysInterner,
- final TypedProperties.StringValue.Interner valuesInterner) {
- this.keysInterner = keysInterner;
- this.valuesInterner = valuesInterner;
+ public CoreMessage(final CoreMessageObjectPools coreMessageObjectPools) {
+ this.coreMessageObjectPools = coreMessageObjectPools;
}
public CoreMessage() {
- this.keysInterner = null;
- this.valuesInterner = null;
+ this.coreMessageObjectPools = null;
}
/** On core there's no delivery annotation */
@@ -326,10 +322,13 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
}
public CoreMessage(long id, int bufferSize) {
+ this(id, bufferSize, null);
+ }
+
+ public CoreMessage(long id, int bufferSize, CoreMessageObjectPools coreMessageObjectPools) {
this.initBuffer(bufferSize);
this.setMessageID(id);
- this.keysInterner = null;
- this.valuesInterner = null;
+ this.coreMessageObjectPools = coreMessageObjectPools;
}
protected CoreMessage(CoreMessage other, TypedProperties copyProperties) {
@@ -343,8 +342,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
this.timestamp = other.timestamp;
this.priority = other.priority;
this.userID = other.userID;
- this.keysInterner = other.keysInterner;
- this.valuesInterner = other.valuesInterner;
+ this.coreMessageObjectPools = other.coreMessageObjectPools;
if (copyProperties != null) {
this.properties = new TypedProperties(copyProperties);
}
@@ -424,7 +422,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
@Override
public CoreMessage setValidatedUserID(String validatedUserID) {
- putStringProperty(Message.HDR_VALIDATED_USER, SimpleString.toSimpleString(validatedUserID));
+ putStringProperty(Message.HDR_VALIDATED_USER, SimpleString.toSimpleString(validatedUserID, getPropertyValuesPool()));
return this;
}
@@ -479,7 +477,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
TypedProperties properties = new TypedProperties();
if (buffer != null && propertiesLocation >= 0) {
final ByteBuf byteBuf = buffer.duplicate().readerIndex(propertiesLocation);
- properties.decode(byteBuf, keysInterner, valuesInterner);
+ properties.decode(byteBuf, coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesDecoderPools());
}
this.properties = properties;
}
@@ -543,17 +541,8 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
private void decodeHeadersAndProperties(final ByteBuf buffer, boolean lazyProperties) {
messageIDPosition = buffer.readerIndex();
messageID = buffer.readLong();
- int b = buffer.readByte();
- if (b != DataConstants.NULL) {
- final int length = buffer.readInt();
- if (keysInterner != null) {
- address = keysInterner.intern(buffer, length);
- } else {
- address = SimpleString.readSimpleString(buffer, length);
- }
- } else {
- address = null;
- }
+
+ address = SimpleString.readNullableSimpleString(buffer, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressDecoderPool());
if (buffer.readByte() == DataConstants.NOT_NULL) {
byte[] bytes = new byte[16];
buffer.readBytes(bytes);
@@ -571,7 +560,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
propertiesLocation = buffer.readerIndex();
} else {
properties = new TypedProperties();
- properties.decode(buffer, keysInterner, valuesInterner);
+ properties.decode(buffer, coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesDecoderPools());
}
}
@@ -671,7 +660,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
@Override
public CoreMessage setAddress(String address) {
messageChanged();
- this.address = SimpleString.toSimpleString(address);
+ this.address = SimpleString.toSimpleString(address, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressStringSimpleStringPool());
return this;
}
@@ -703,7 +692,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
public CoreMessage putBooleanProperty(final String key, final boolean value) {
messageChanged();
checkProperties();
- properties.putBooleanProperty(new SimpleString(key), value);
+ properties.putBooleanProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
return this;
}
@@ -724,7 +713,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
@Override
public Boolean getBooleanProperty(final String key) throws ActiveMQPropertyConversionException {
checkProperties();
- return properties.getBooleanProperty(new SimpleString(key));
+ return properties.getBooleanProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()));
}
@Override
@@ -739,7 +728,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
public CoreMessage putByteProperty(final String key, final byte value) {
messageChanged();
checkProperties();
- properties.putByteProperty(new SimpleString(key), value);
+ properties.putByteProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
return this;
}
@@ -752,7 +741,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
@Override
public Byte getByteProperty(final String key) throws ActiveMQPropertyConversionException {
- return getByteProperty(SimpleString.toSimpleString(key));
+ return getByteProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()));
}
@Override
@@ -768,7 +757,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
public CoreMessage putBytesProperty(final String key, final byte[] value) {
messageChanged();
checkProperties();
- properties.putBytesProperty(new SimpleString(key), value);
+ properties.putBytesProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
return this;
}
@@ -780,7 +769,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
@Override
public byte[] getBytesProperty(final String key) throws ActiveMQPropertyConversionException {
- return getBytesProperty(new SimpleString(key));
+ return getBytesProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()));
}
@Override
@@ -795,7 +784,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
public CoreMessage putCharProperty(String key, char value) {
messageChanged();
checkProperties();
- properties.putCharProperty(new SimpleString(key), value);
+ properties.putCharProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
return this;
}
@@ -811,7 +800,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
public CoreMessage putShortProperty(final String key, final short value) {
messageChanged();
checkProperties();
- properties.putShortProperty(new SimpleString(key), value);
+ properties.putShortProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
return this;
}
@@ -827,7 +816,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
public CoreMessage putIntProperty(final String key, final int value) {
messageChanged();
checkProperties();
- properties.putIntProperty(new SimpleString(key), value);
+ properties.putIntProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
return this;
}
@@ -854,7 +843,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
public CoreMessage putLongProperty(final String key, final long value) {
messageChanged();
checkProperties();
- properties.putLongProperty(new SimpleString(key), value);
+ properties.putLongProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
return this;
}
@@ -882,7 +871,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
public CoreMessage putFloatProperty(final String key, final float value) {
messageChanged();
checkProperties();
- properties.putFloatProperty(new SimpleString(key), value);
+ properties.putFloatProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
return this;
}
@@ -898,7 +887,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
public CoreMessage putDoubleProperty(final String key, final double value) {
messageChanged();
checkProperties();
- properties.putDoubleProperty(new SimpleString(key), value);
+ properties.putDoubleProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
return this;
}
@@ -924,10 +913,19 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
}
@Override
+ public CoreMessage putStringProperty(final SimpleString key, final String value) {
+ messageChanged();
+ checkProperties();
+ properties.putSimpleStringProperty(key, SimpleString.toSimpleString(value, getPropertyValuesPool()));
+ return this;
+ }
+
+
+ @Override
public CoreMessage putStringProperty(final String key, final String value) {
messageChanged();
checkProperties();
- properties.putSimpleStringProperty(new SimpleString(key), SimpleString.toSimpleString(value));
+ properties.putSimpleStringProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), SimpleString.toSimpleString(value, getPropertyValuesPool()));
return this;
}
@@ -943,7 +941,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
@Override
public Object getObjectProperty(final String key) {
checkProperties();
- return getObjectProperty(SimpleString.toSimpleString(key));
+ return getObjectProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()));
}
@Override
@@ -955,7 +953,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
@Override
public CoreMessage putObjectProperty(final String key, final Object value) throws ActiveMQPropertyConversionException {
messageChanged();
- putObjectProperty(new SimpleString(key), value);
+ putObjectProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
return this;
}
@@ -968,7 +966,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
@Override
public Short getShortProperty(final String key) throws ActiveMQPropertyConversionException {
checkProperties();
- return properties.getShortProperty(new SimpleString(key));
+ return properties.getShortProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()));
}
@Override
@@ -980,7 +978,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
@Override
public Float getFloatProperty(final String key) throws ActiveMQPropertyConversionException {
checkProperties();
- return properties.getFloatProperty(new SimpleString(key));
+ return properties.getFloatProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()));
}
@Override
@@ -996,7 +994,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
@Override
public String getStringProperty(final String key) throws ActiveMQPropertyConversionException {
- return getStringProperty(new SimpleString(key));
+ return getStringProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()));
}
@Override
@@ -1008,7 +1006,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
@Override
public SimpleString getSimpleStringProperty(final String key) throws ActiveMQPropertyConversionException {
checkProperties();
- return properties.getSimpleStringProperty(new SimpleString(key));
+ return properties.getSimpleStringProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()));
}
@Override
@@ -1025,7 +1023,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
public Object removeProperty(final String key) {
messageChanged();
checkProperties();
- Object oldValue = properties.removeProperty(new SimpleString(key));
+ Object oldValue = properties.removeProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()));
if (oldValue != null) {
messageChanged();
}
@@ -1041,7 +1039,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
@Override
public boolean containsProperty(final String key) {
checkProperties();
- return properties.containsProperty(new SimpleString(key));
+ return properties.containsProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()));
}
@Override
@@ -1116,6 +1114,11 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
}
@Override
+ public CoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) {
+ return this;
+ }
+
+ @Override
public String toString() {
try {
checkProperties();
@@ -1135,4 +1138,12 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
return new java.util.Date(timestamp).toString();
}
}
+
+ private SimpleString.StringSimpleStringPool getPropertyKeysPool() {
+ return coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesStringSimpleStringPools().getPropertyKeysPool();
+ }
+
+ private SimpleString.StringSimpleStringPool getPropertyValuesPool() {
+ return coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesStringSimpleStringPools().getPropertyValuesPool();
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessageObjectPools.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessageObjectPools.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessageObjectPools.java
new file mode 100644
index 0000000..d4e3ed1
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessageObjectPools.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.message.impl;
+
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.utils.collections.TypedProperties;
+
+public class CoreMessageObjectPools {
+
+ private Supplier<SimpleString.ByteBufSimpleStringPool> addressDecoderPool = Suppliers.memoize(SimpleString.ByteBufSimpleStringPool::new);
+ private Supplier<TypedProperties.TypedPropertiesDecoderPools> propertiesDecoderPools = Suppliers.memoize(TypedProperties.TypedPropertiesDecoderPools::new);
+
+ private Supplier<SimpleString.StringSimpleStringPool> groupIdStringSimpleStringPool = Suppliers.memoize(SimpleString.StringSimpleStringPool::new);
+ private Supplier<SimpleString.StringSimpleStringPool> addressStringSimpleStringPool = Suppliers.memoize(SimpleString.StringSimpleStringPool::new);
+ private Supplier<TypedProperties.TypedPropertiesStringSimpleStringPools> propertiesStringSimpleStringPools = Suppliers.memoize(TypedProperties.TypedPropertiesStringSimpleStringPools::new);
+
+ public CoreMessageObjectPools() {
+ }
+
+ public SimpleString.ByteBufSimpleStringPool getAddressDecoderPool() {
+ return addressDecoderPool.get();
+ }
+
+ public SimpleString.StringSimpleStringPool getAddressStringSimpleStringPool() {
+ return addressStringSimpleStringPool.get();
+ }
+
+ public SimpleString.StringSimpleStringPool getGroupIdStringSimpleStringPool() {
+ return groupIdStringSimpleStringPool.get();
+ }
+
+ public TypedProperties.TypedPropertiesDecoderPools getPropertiesDecoderPools() {
+ return propertiesDecoderPools.get();
+ }
+
+ public TypedProperties.TypedPropertiesStringSimpleStringPools getPropertiesStringSimpleStringPools() {
+ return propertiesStringSimpleStringPools.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java
index 787e499..ad8c7a9 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.protocol;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageImpl;
import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketDecoder;
@@ -32,11 +33,7 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SES
public class ClientPacketDecoder extends PacketDecoder {
private static final long serialVersionUID = 6952614096979334582L;
- public static final ClientPacketDecoder INSTANCE = new ClientPacketDecoder();
-
- protected ClientPacketDecoder() {
-
- }
+ protected final CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools();
@Override
public Packet decode(final ActiveMQBuffer in, CoreRemotingConnection connection) {
@@ -56,9 +53,9 @@ public class ClientPacketDecoder extends PacketDecoder {
switch (packetType) {
case SESS_RECEIVE_MSG: {
if (connection.isVersionBeforeAddressChange()) {
- packet = new SessionReceiveMessage_1X(new ClientMessageImpl());
+ packet = new SessionReceiveMessage_1X(new ClientMessageImpl(coreMessageObjectPools));
} else {
- packet = new SessionReceiveMessage(new ClientMessageImpl());
+ packet = new SessionReceiveMessage(new ClientMessageImpl(coreMessageObjectPools));
}
break;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
index f0005ff..c58a0bd 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
@@ -511,7 +511,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager {
}
protected PacketDecoder createPacketDecoder() {
- return ClientPacketDecoder.INSTANCE;
+ return new ClientPacketDecoder();
}
private void forceReturnChannel1(ActiveMQException cause) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java
index 2660f96..e8f5920 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java
@@ -109,12 +109,18 @@ public class MessageUtil {
return message.getSimpleStringProperty(REPLYTO_HEADER_NAME);
}
- public static void setJMSReplyTo(Message message, final SimpleString dest) {
-
+ public static void setJMSReplyTo(Message message, final String dest) {
if (dest == null) {
message.removeProperty(REPLYTO_HEADER_NAME);
} else {
+ message.putStringProperty(REPLYTO_HEADER_NAME, dest);
+ }
+ }
+ public static void setJMSReplyTo(Message message, final SimpleString dest) {
+ if (dest == null) {
+ message.removeProperty(REPLYTO_HEADER_NAME);
+ } else {
message.putStringProperty(REPLYTO_HEADER_NAME, dest);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java
index ec94011..310b4ed 100644
--- a/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java
+++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java
@@ -337,7 +337,7 @@ public class CoreMessageTest {
public String generate(String body) throws Exception {
- ClientMessageImpl message = new ClientMessageImpl(MESSAGE_TYPE, DURABLE, EXPIRATION, TIMESTAMP, PRIORITY, 10 * 1024);
+ ClientMessageImpl message = new ClientMessageImpl(MESSAGE_TYPE, DURABLE, EXPIRATION, TIMESTAMP, PRIORITY, 10 * 1024, null);
TextMessageUtil.writeBodyText(message.getBodyBuffer(), SimpleString.toSimpleString(body));
message.setAddress(ADDRESS);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java
index 626dd4d..7750564 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java
@@ -99,26 +99,28 @@ public class ActiveMQDestination extends JNDIStorable implements Destination, Se
}
}
- public static String createQueueNameForSubscription(final boolean isDurable,
+ public static SimpleString createQueueNameForSubscription(final boolean isDurable,
final String clientID,
final String subscriptionName) {
+ final String queueName;
if (clientID != null) {
if (isDurable) {
- return ActiveMQDestination.escape(clientID) + SEPARATOR +
+ queueName = ActiveMQDestination.escape(clientID) + SEPARATOR +
ActiveMQDestination.escape(subscriptionName);
} else {
- return "nonDurable" + SEPARATOR +
+ queueName = "nonDurable" + SEPARATOR +
ActiveMQDestination.escape(clientID) + SEPARATOR +
ActiveMQDestination.escape(subscriptionName);
}
} else {
if (isDurable) {
- return ActiveMQDestination.escape(subscriptionName);
+ queueName = ActiveMQDestination.escape(subscriptionName);
} else {
- return "nonDurable" + SEPARATOR +
+ queueName = "nonDurable" + SEPARATOR +
ActiveMQDestination.escape(subscriptionName);
}
}
+ return SimpleString.toSimpleString(queueName);
}
public static String createQueueNameForSharedSubscription(final boolean isDurable,
@@ -192,10 +194,18 @@ public class ActiveMQDestination extends JNDIStorable implements Destination, Se
return new ActiveMQQueue(address);
}
+ public static ActiveMQQueue createQueue(final SimpleString address) {
+ return new ActiveMQQueue(address);
+ }
+
public static ActiveMQTopic createTopic(final String address) {
return new ActiveMQTopic(address);
}
+ public static ActiveMQTopic createTopic(final SimpleString address) {
+ return new ActiveMQTopic(address);
+ }
+
public static ActiveMQTemporaryQueue createTemporaryQueue(final String address, final ActiveMQSession session) {
return new ActiveMQTemporaryQueue(address, session);
}