You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2020/02/05 16:20:07 UTC
[activemq-artemis] branch master updated: ARTEMIS-2604 Optimize
journal loading
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 3282f10 ARTEMIS-2604 Optimize journal loading
new 1bc88a7 This closes #2950
3282f10 is described below
commit 3282f105bb3e74afcfa10959a9b0a5486f192d74
Author: Francesco Nigro <ni...@gmail.com>
AuthorDate: Tue Jan 21 23:23:55 2020 +0100
ARTEMIS-2604 Optimize journal loading
- Avoid some Properties Decoding, checking if we need certain properties like scheduled delivery
- Avoid creating some unnecessary SimpleString instances
- Removed some intermediate ActiveMQBuffer allocation
- Removed some intermediate UnreleasableByteBuf allocation
---
.../activemq/artemis/utils/CompositeAddress.java | 20 ++++-
.../artemis/utils/collections/TypedProperties.java | 73 +++++++++++++++++
.../artemis/utils/TypedPropertiesTest.java | 92 ++++++++++++++++++++++
.../apache/activemq/artemis/api/core/Message.java | 8 ++
.../artemis/core/message/impl/CoreMessage.java | 35 +++++++-
.../journal/AbstractJournalStorageManager.java | 6 +-
.../core/postoffice/impl/PostOfficeImpl.java | 14 +++-
7 files changed, 240 insertions(+), 8 deletions(-)
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/CompositeAddress.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/CompositeAddress.java
index 6376575..b792172 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/CompositeAddress.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/CompositeAddress.java
@@ -35,7 +35,15 @@ public class CompositeAddress {
}
public static SimpleString extractQueueName(SimpleString name) {
- return name == null ? null : new SimpleString(extractQueueName(name.toString()));
+ if (name == null) {
+ return null;
+ }
+ final String nameString = name.toString();
+ final String queueName = extractQueueName(nameString);
+ if (queueName.equals(nameString)) {
+ return name;
+ }
+ return new SimpleString(queueName);
}
public static String extractQueueName(String queue) {
@@ -50,7 +58,15 @@ public class CompositeAddress {
}
public static SimpleString extractAddressName(SimpleString address) {
- return address == null ? null : new SimpleString(extractAddressName(address.toString()));
+ if (address == null) {
+ return null;
+ }
+ final String addrString = address.toString();
+ final String addressName = extractAddressName(addrString);
+ if (addressName.equals(addrString)) {
+ return address;
+ }
+ return new SimpleString(addressName);
}
public static String extractAddressName(String address) {
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
index c6b513d..223da17 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
@@ -372,6 +372,79 @@ public class TypedProperties {
}
}
+ /**
+ * Performs a search among the valid key properties contained in {@code buffer}, starting from {@code from}
+ * assuming it to be a valid encoded {@link TypedProperties} content.
+ *
+ * @throws IllegalStateException if any not-valid property is found while searching the {@code key} property
+ */
+ public static boolean searchProperty(SimpleString key, ByteBuf buffer, int startIndex) {
+ // It won't implement a straight linear search for key
+ // because it would risk to find a SimpleString encoded property value
+ // equals to the key we're searching for!
+ int index = startIndex;
+ byte b = buffer.getByte(index);
+ index++;
+ if (b == DataConstants.NULL) {
+ return false;
+ }
+ final int numHeaders = buffer.getInt(index);
+ index += Integer.BYTES;
+ for (int i = 0; i < numHeaders; i++) {
+ final int keyLength = buffer.getInt(index);
+ index += Integer.BYTES;
+ if (key.equals(buffer, index, keyLength)) {
+ return true;
+ }
+ if (i == numHeaders - 1) {
+ return false;
+ }
+ index += keyLength;
+ byte type = buffer.getByte(index);
+ index++;
+ switch (type) {
+ case NULL: {
+ break;
+ }
+ case CHAR:
+ case SHORT: {
+ index += Short.BYTES;
+ break;
+ }
+ case BOOLEAN:
+ case BYTE: {
+ index += Byte.BYTES;
+ break;
+ }
+ case BYTES:
+ case STRING: {
+ index += (Integer.BYTES + buffer.getInt(index));
+ break;
+ }
+ case INT: {
+ index += Integer.BYTES;
+ break;
+ }
+ case LONG: {
+ index += Long.BYTES;
+ break;
+ }
+ case FLOAT: {
+ index += Float.BYTES;
+ break;
+ }
+ case DOUBLE: {
+ index += Double.BYTES;
+ break;
+ }
+ default: {
+ throw ActiveMQUtilBundle.BUNDLE.invalidType(type);
+ }
+ }
+ }
+ return false;
+ }
+
public synchronized void decode(final ByteBuf buffer,
final TypedPropertiesDecoderPools keyValuePools) {
byte b = buffer.readByte();
diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java
index e876f00..e9e7322 100644
--- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java
+++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java
@@ -31,6 +31,8 @@ import org.junit.Test;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import static org.apache.activemq.artemis.utils.collections.TypedProperties.searchProperty;
+
public class TypedPropertiesTest {
private static void assertEqualsTypeProperties(final TypedProperties expected, final TypedProperties actual) {
@@ -250,6 +252,96 @@ public class TypedPropertiesTest {
Assert.assertFalse(properties.clearInternalProperties());
}
+ @Test
+ public void testSearchPropertyIfNone() {
+ TypedProperties props = new TypedProperties();
+ ByteBuf buf = Unpooled.buffer(Byte.BYTES, Byte.BYTES);
+ props.encode(buf);
+ buf.resetReaderIndex();
+ Assert.assertFalse("There is no property", searchProperty(SimpleString.toSimpleString(""), buf, 0));
+ }
+
+ @Test
+ public void testSearchAllProperties() {
+ TypedProperties props = new TypedProperties();
+ props.putByteProperty(RandomUtil.randomSimpleString(), RandomUtil.randomByte());
+ props.putBytesProperty(RandomUtil.randomSimpleString(), RandomUtil.randomBytes());
+ props.putBytesProperty(RandomUtil.randomSimpleString(), null);
+ props.putBooleanProperty(RandomUtil.randomSimpleString(), RandomUtil.randomBoolean());
+ props.putShortProperty(RandomUtil.randomSimpleString(), RandomUtil.randomShort());
+ props.putIntProperty(RandomUtil.randomSimpleString(), RandomUtil.randomInt());
+ props.putLongProperty(RandomUtil.randomSimpleString(), RandomUtil.randomLong());
+ props.putFloatProperty(RandomUtil.randomSimpleString(), RandomUtil.randomFloat());
+ props.putDoubleProperty(RandomUtil.randomSimpleString(), RandomUtil.randomDouble());
+ props.putCharProperty(RandomUtil.randomSimpleString(), RandomUtil.randomChar());
+ props.putSimpleStringProperty(RandomUtil.randomSimpleString(), RandomUtil.randomSimpleString());
+ props.putSimpleStringProperty(RandomUtil.randomSimpleString(), null);
+ final SimpleString value = RandomUtil.randomSimpleString();
+ props.putSimpleStringProperty(RandomUtil.randomSimpleString(), value);
+ ByteBuf buf = Unpooled.buffer();
+ props.encode(buf);
+ buf.resetReaderIndex();
+ Assert.assertFalse(searchProperty(value, buf, 0));
+ props.forEachKey(key -> {
+ Assert.assertTrue(searchProperty(key, buf, 0));
+ Assert.assertTrue(searchProperty(SimpleString.toSimpleString(key.toString()), buf, 0));
+ // concat a string just to check if the search won't perform an eager search to find the string pattern
+ Assert.assertFalse(searchProperty(key.concat(" "), buf, 0));
+ });
+ }
+
+ @Test(expected = IndexOutOfBoundsException.class)
+ public void testSearchPartiallyEncodedBuffer() {
+ final int expectedLength = Integer.BYTES + Byte.BYTES;
+ ByteBuf buf = Unpooled.buffer(expectedLength, expectedLength);
+ buf.writeByte(DataConstants.NOT_NULL);
+ buf.writeInt(1);
+ buf.resetReaderIndex();
+ searchProperty(SimpleString.toSimpleString(" "), buf, 0);
+ }
+
+ @Test(expected = IndexOutOfBoundsException.class)
+ public void testSearchPartiallyEncodedString() {
+ final int expectedLength = Integer.BYTES + Byte.BYTES + Integer.BYTES;
+ ByteBuf buf = Unpooled.buffer(expectedLength, expectedLength);
+ buf.writeByte(DataConstants.NOT_NULL);
+ buf.writeInt(1);
+ //SimpleString::data length
+ buf.writeInt(2);
+ buf.resetReaderIndex();
+ searchProperty(SimpleString.toSimpleString("a"), buf, 0);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testSearchWithInvalidTypeBeforeEnd() {
+ ByteBuf buf = Unpooled.buffer();
+ buf.writeByte(DataConstants.NOT_NULL);
+ // fake 2 properties
+ buf.writeInt(2);
+ // 1 key with length 2
+ buf.writeInt(2);
+ buf.writeShort(3);
+ // invalid type
+ buf.writeByte(Byte.MIN_VALUE);
+ buf.resetReaderIndex();
+ searchProperty(SimpleString.toSimpleString(""), buf, 0);
+ }
+
+ @Test
+ public void testSearchWithInvalidTypeEnd() {
+ ByteBuf buf = Unpooled.buffer();
+ buf.writeByte(DataConstants.NOT_NULL);
+ // fake 1 property
+ buf.writeInt(1);
+ // 1 key with length 2
+ buf.writeInt(2);
+ buf.writeShort(3);
+ // invalid type
+ buf.writeByte(Byte.MIN_VALUE);
+ buf.resetReaderIndex();
+ Assert.assertFalse(searchProperty(SimpleString.toSimpleString(""), buf, 0));
+ }
+
@Before
public void setUp() throws Exception {
props = new TypedProperties();
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index b6c7acb..1de8150 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -186,6 +186,14 @@ public interface Message {
// only on core
}
+ /**
+ * Search for the existence of the property: an implementor can save
+ * the message to be decoded, if possible.
+ */
+ default boolean hasScheduledDeliveryTime() {
+ return getScheduledDeliveryTime() != null;
+ }
+
default RoutingType getRoutingType() {
return null;
}
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index 45c4bdd..9050550 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -19,12 +19,14 @@ package org.apache.activemq.artemis.core.message.impl;
import java.io.InputStream;
import java.nio.ByteBuffer;
+import java.util.Objects;
import java.util.Set;
import java.util.zip.DataFormatException;
import java.util.zip.Inflater;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
@@ -133,7 +135,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
}
public CoreMessage initBuffer(final int initialMessageBufferSize) {
- buffer = ActiveMQBuffers.dynamicBuffer(initialMessageBufferSize).byteBuf();
+ buffer = Unpooled.buffer(initialMessageBufferSize);
// There's a bug in netty which means a dynamic buffer won't resize until you write a byte
buffer.writeByte((byte) 0);
@@ -1085,6 +1087,37 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
}
@Override
+ public boolean hasScheduledDeliveryTime() {
+ return searchProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
+ }
+
+ /**
+ * Differently from {@link #containsProperty(SimpleString)}, this method can save decoding the message,
+ * performing a search of the {@code key} property and falling back to {@link #containsProperty(SimpleString)}
+ * if not possible or if already decoded.
+ */
+ public boolean searchProperty(SimpleString key) {
+ Objects.requireNonNull(key, "key cannot be null");
+ TypedProperties properties = this.properties;
+ if (properties != null) {
+ return properties.containsProperty(key);
+ }
+ synchronized (this) {
+ final ByteBuf buffer = this.buffer;
+ // acquiring the lock here, although heavy-weight, is the safer way to do this,
+ // because we cannot trust that a racing thread won't modify buffer
+ if (buffer == null) {
+ throw new NullPointerException("buffer cannot be null");
+ }
+ final int propertiesLocation = this.propertiesLocation;
+ if (propertiesLocation < 0) {
+ throw new IllegalStateException("propertiesLocation = " + propertiesLocation);
+ }
+ return TypedProperties.searchProperty(key, buffer, propertiesLocation);
+ }
+ }
+
+ @Override
public boolean containsProperty(final SimpleString key) {
return getProperties().containsProperty(key);
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 66299af..effb7e1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -41,11 +41,13 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.transaction.xa.Xid;
+import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.io.IOCallback;
@@ -869,7 +871,9 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
byte[] data = record.data;
- ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(data);
+ // We can make this byte[] buffer releasable, because subsequent methods using it are not supposed
+ // to release it. It saves creating useless UnreleasableByteBuf wrappers
+ ChannelBufferWrapper buff = new ChannelBufferWrapper(Unpooled.wrappedBuffer(data), true);
byte recordType = record.getUserRecordType();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index e96f808..f35e590 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -1181,9 +1181,12 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
MessageReference reference = MessageReference.Factory.createReference(message, queue);
- Long scheduledDeliveryTime = message.getScheduledDeliveryTime();
- if (scheduledDeliveryTime != null) {
- reference.setScheduledDeliveryTime(scheduledDeliveryTime);
+ Long scheduledDeliveryTime;
+ if (message.hasScheduledDeliveryTime()) {
+ scheduledDeliveryTime = message.getScheduledDeliveryTime();
+ if (scheduledDeliveryTime != null) {
+ reference.setScheduledDeliveryTime(scheduledDeliveryTime);
+ }
}
message.incrementDurableRefCount();
@@ -1433,7 +1436,10 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
Transaction tx = context.getTransaction();
- Long deliveryTime = message.getScheduledDeliveryTime();
+ Long deliveryTime = null;
+ if (message.hasScheduledDeliveryTime()) {
+ deliveryTime = message.getScheduledDeliveryTime();
+ }
for (Map.Entry<SimpleString, RouteContextList> entry : context.getContexListing().entrySet()) {
PagingStore store = pagingManager.getPageStore(entry.getKey());