You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2020/02/05 16:20:07 UTC

[activemq-artemis] branch master updated: ARTEMIS-2604 Optimize journal loading

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 3282f10  ARTEMIS-2604 Optimize journal loading
     new 1bc88a7  This closes #2950
3282f10 is described below

commit 3282f105bb3e74afcfa10959a9b0a5486f192d74
Author: Francesco Nigro <ni...@gmail.com>
AuthorDate: Tue Jan 21 23:23:55 2020 +0100

    ARTEMIS-2604 Optimize journal loading
    
    - Avoid some Properties Decoding, checking if we need certain properties like scheduled delivery
    - Avoid creating some unnecessary SimpleString instances
    - Removed some intermediate ActiveMQBuffer allocation
    - Removed some intermediate UnreleasableByteBuf allocation
---
 .../activemq/artemis/utils/CompositeAddress.java   | 20 ++++-
 .../artemis/utils/collections/TypedProperties.java | 73 +++++++++++++++++
 .../artemis/utils/TypedPropertiesTest.java         | 92 ++++++++++++++++++++++
 .../apache/activemq/artemis/api/core/Message.java  |  8 ++
 .../artemis/core/message/impl/CoreMessage.java     | 35 +++++++-
 .../journal/AbstractJournalStorageManager.java     |  6 +-
 .../core/postoffice/impl/PostOfficeImpl.java       | 14 +++-
 7 files changed, 240 insertions(+), 8 deletions(-)

diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/CompositeAddress.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/CompositeAddress.java
index 6376575..b792172 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/CompositeAddress.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/CompositeAddress.java
@@ -35,7 +35,15 @@ public class CompositeAddress {
    }
 
    public static SimpleString extractQueueName(SimpleString name) {
-      return name == null ? null : new SimpleString(extractQueueName(name.toString()));
+      if (name == null) {
+         return null;
+      }
+      final String nameString = name.toString();
+      final String queueName = extractQueueName(nameString);
+      if (queueName.equals(nameString)) {
+         return name;
+      }
+      return new SimpleString(queueName);
    }
 
    public static String extractQueueName(String queue) {
@@ -50,7 +58,15 @@ public class CompositeAddress {
    }
 
    public static SimpleString extractAddressName(SimpleString address) {
-      return address == null ? null : new SimpleString(extractAddressName(address.toString()));
+      if (address == null) {
+         return null;
+      }
+      final String addrString = address.toString();
+      final String addressName = extractAddressName(addrString);
+      if (addressName.equals(addrString)) {
+         return address;
+      }
+      return new SimpleString(addressName);
    }
 
    public static String extractAddressName(String address) {
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
index c6b513d..223da17 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
@@ -372,6 +372,79 @@ public class TypedProperties {
       }
    }
 
+   /**
+    * Performs a search among the valid key properties contained in {@code buffer}, starting from {@code from}
+    * assuming it to be a valid encoded {@link TypedProperties} content.
+    *
+    * @throws IllegalStateException if any not-valid property is found while searching the {@code key} property
+    */
+   public static boolean searchProperty(SimpleString key, ByteBuf buffer, int startIndex) {
+      // It won't implement a straight linear search for key
+      // because it would risk to find a SimpleString encoded property value
+      // equals to the key we're searching for!
+      int index = startIndex;
+      byte b = buffer.getByte(index);
+      index++;
+      if (b == DataConstants.NULL) {
+         return false;
+      }
+      final int numHeaders = buffer.getInt(index);
+      index += Integer.BYTES;
+      for (int i = 0; i < numHeaders; i++) {
+         final int keyLength = buffer.getInt(index);
+         index += Integer.BYTES;
+         if (key.equals(buffer, index, keyLength)) {
+            return true;
+         }
+         if (i == numHeaders - 1) {
+            return false;
+         }
+         index += keyLength;
+         byte type = buffer.getByte(index);
+         index++;
+         switch (type) {
+            case NULL: {
+               break;
+            }
+            case CHAR:
+            case SHORT: {
+               index += Short.BYTES;
+               break;
+            }
+            case BOOLEAN:
+            case BYTE: {
+               index += Byte.BYTES;
+               break;
+            }
+            case BYTES:
+            case STRING: {
+               index += (Integer.BYTES + buffer.getInt(index));
+               break;
+            }
+            case INT: {
+               index += Integer.BYTES;
+               break;
+            }
+            case LONG: {
+               index += Long.BYTES;
+               break;
+            }
+            case FLOAT: {
+               index += Float.BYTES;
+               break;
+            }
+            case DOUBLE: {
+               index += Double.BYTES;
+               break;
+            }
+            default: {
+               throw ActiveMQUtilBundle.BUNDLE.invalidType(type);
+            }
+         }
+      }
+      return false;
+   }
+
    public synchronized void decode(final ByteBuf buffer,
                                    final TypedPropertiesDecoderPools keyValuePools) {
       byte b = buffer.readByte();
diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java
index e876f00..e9e7322 100644
--- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java
+++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java
@@ -31,6 +31,8 @@ import org.junit.Test;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 
+import static org.apache.activemq.artemis.utils.collections.TypedProperties.searchProperty;
+
 public class TypedPropertiesTest {
 
    private static void assertEqualsTypeProperties(final TypedProperties expected, final TypedProperties actual) {
@@ -250,6 +252,96 @@ public class TypedPropertiesTest {
       Assert.assertFalse(properties.clearInternalProperties());
    }
 
+   @Test
+   public void testSearchPropertyIfNone() {
+      TypedProperties props = new TypedProperties();
+      ByteBuf buf = Unpooled.buffer(Byte.BYTES, Byte.BYTES);
+      props.encode(buf);
+      buf.resetReaderIndex();
+      Assert.assertFalse("There is no property", searchProperty(SimpleString.toSimpleString(""), buf, 0));
+   }
+
+   @Test
+   public void testSearchAllProperties() {
+      TypedProperties props = new TypedProperties();
+      props.putByteProperty(RandomUtil.randomSimpleString(), RandomUtil.randomByte());
+      props.putBytesProperty(RandomUtil.randomSimpleString(), RandomUtil.randomBytes());
+      props.putBytesProperty(RandomUtil.randomSimpleString(), null);
+      props.putBooleanProperty(RandomUtil.randomSimpleString(), RandomUtil.randomBoolean());
+      props.putShortProperty(RandomUtil.randomSimpleString(), RandomUtil.randomShort());
+      props.putIntProperty(RandomUtil.randomSimpleString(), RandomUtil.randomInt());
+      props.putLongProperty(RandomUtil.randomSimpleString(), RandomUtil.randomLong());
+      props.putFloatProperty(RandomUtil.randomSimpleString(), RandomUtil.randomFloat());
+      props.putDoubleProperty(RandomUtil.randomSimpleString(), RandomUtil.randomDouble());
+      props.putCharProperty(RandomUtil.randomSimpleString(), RandomUtil.randomChar());
+      props.putSimpleStringProperty(RandomUtil.randomSimpleString(), RandomUtil.randomSimpleString());
+      props.putSimpleStringProperty(RandomUtil.randomSimpleString(), null);
+      final SimpleString value = RandomUtil.randomSimpleString();
+      props.putSimpleStringProperty(RandomUtil.randomSimpleString(), value);
+      ByteBuf buf = Unpooled.buffer();
+      props.encode(buf);
+      buf.resetReaderIndex();
+      Assert.assertFalse(searchProperty(value, buf, 0));
+      props.forEachKey(key -> {
+         Assert.assertTrue(searchProperty(key, buf, 0));
+         Assert.assertTrue(searchProperty(SimpleString.toSimpleString(key.toString()), buf, 0));
+         // concat a string just to check if the search won't perform an eager search to find the string pattern
+         Assert.assertFalse(searchProperty(key.concat(" "), buf, 0));
+      });
+   }
+
+   @Test(expected = IndexOutOfBoundsException.class)
+   public void testSearchPartiallyEncodedBuffer() {
+      final int expectedLength = Integer.BYTES + Byte.BYTES;
+      ByteBuf buf = Unpooled.buffer(expectedLength, expectedLength);
+      buf.writeByte(DataConstants.NOT_NULL);
+      buf.writeInt(1);
+      buf.resetReaderIndex();
+      searchProperty(SimpleString.toSimpleString(" "), buf, 0);
+   }
+
+   @Test(expected = IndexOutOfBoundsException.class)
+   public void testSearchPartiallyEncodedString() {
+      final int expectedLength = Integer.BYTES + Byte.BYTES + Integer.BYTES;
+      ByteBuf buf = Unpooled.buffer(expectedLength, expectedLength);
+      buf.writeByte(DataConstants.NOT_NULL);
+      buf.writeInt(1);
+      //SimpleString::data length
+      buf.writeInt(2);
+      buf.resetReaderIndex();
+      searchProperty(SimpleString.toSimpleString("a"), buf, 0);
+   }
+
+   @Test(expected = IllegalStateException.class)
+   public void testSearchWithInvalidTypeBeforeEnd() {
+      ByteBuf buf = Unpooled.buffer();
+      buf.writeByte(DataConstants.NOT_NULL);
+      // fake 2 properties
+      buf.writeInt(2);
+      // 1 key with length 2
+      buf.writeInt(2);
+      buf.writeShort(3);
+      // invalid type
+      buf.writeByte(Byte.MIN_VALUE);
+      buf.resetReaderIndex();
+      searchProperty(SimpleString.toSimpleString(""), buf, 0);
+   }
+
+   @Test
+   public void testSearchWithInvalidTypeEnd() {
+      ByteBuf buf = Unpooled.buffer();
+      buf.writeByte(DataConstants.NOT_NULL);
+      // fake 1 property
+      buf.writeInt(1);
+      // 1 key with length 2
+      buf.writeInt(2);
+      buf.writeShort(3);
+      // invalid type
+      buf.writeByte(Byte.MIN_VALUE);
+      buf.resetReaderIndex();
+      Assert.assertFalse(searchProperty(SimpleString.toSimpleString(""), buf, 0));
+   }
+
    @Before
    public void setUp() throws Exception {
       props = new TypedProperties();
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index b6c7acb..1de8150 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -186,6 +186,14 @@ public interface Message {
       // only on core
    }
 
+   /**
+    * Search for the existence of the property: an implementor can save
+    * the message to be decoded, if possible.
+    */
+   default boolean hasScheduledDeliveryTime() {
+      return getScheduledDeliveryTime() != null;
+   }
+
    default RoutingType getRoutingType() {
       return null;
    }
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index 45c4bdd..9050550 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -19,12 +19,14 @@ package org.apache.activemq.artemis.core.message.impl;
 
 import java.io.InputStream;
 import java.nio.ByteBuffer;
+import java.util.Objects;
 import java.util.Set;
 import java.util.zip.DataFormatException;
 import java.util.zip.Inflater;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
 import io.netty.buffer.UnpooledByteBufAllocator;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
@@ -133,7 +135,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
    }
 
    public CoreMessage initBuffer(final int initialMessageBufferSize) {
-      buffer = ActiveMQBuffers.dynamicBuffer(initialMessageBufferSize).byteBuf();
+      buffer = Unpooled.buffer(initialMessageBufferSize);
 
       // There's a bug in netty which means a dynamic buffer won't resize until you write a byte
       buffer.writeByte((byte) 0);
@@ -1085,6 +1087,37 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
    }
 
    @Override
+   public boolean hasScheduledDeliveryTime() {
+      return searchProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
+   }
+
+   /**
+    * Differently from {@link #containsProperty(SimpleString)}, this method can save decoding the message,
+    * performing a search of the {@code key} property and falling back to {@link #containsProperty(SimpleString)}
+    * if not possible or if already decoded.
+    */
+   public boolean searchProperty(SimpleString key) {
+      Objects.requireNonNull(key, "key cannot be null");
+      TypedProperties properties = this.properties;
+      if (properties != null) {
+         return properties.containsProperty(key);
+      }
+      synchronized (this) {
+         final ByteBuf buffer = this.buffer;
+         // acquiring the lock here, although heavy-weight, is the safer way to do this,
+         // because we cannot trust that a racing thread won't modify buffer
+         if (buffer == null) {
+            throw new NullPointerException("buffer cannot be null");
+         }
+         final int propertiesLocation = this.propertiesLocation;
+         if (propertiesLocation < 0) {
+            throw new IllegalStateException("propertiesLocation = " + propertiesLocation);
+         }
+         return TypedProperties.searchProperty(key, buffer, propertiesLocation);
+      }
+   }
+
+   @Override
    public boolean containsProperty(final SimpleString key) {
       return getProperties().containsProperty(key);
    }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 66299af..effb7e1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -41,11 +41,13 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import javax.transaction.xa.Xid;
 
+import io.netty.buffer.Unpooled;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.io.IOCallback;
@@ -869,7 +871,9 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
 
                byte[] data = record.data;
 
-               ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(data);
+               // We can make this byte[] buffer releasable, because subsequent methods using it are not supposed
+               // to release it. It saves creating useless UnreleasableByteBuf wrappers
+               ChannelBufferWrapper buff = new ChannelBufferWrapper(Unpooled.wrappedBuffer(data), true);
 
                byte recordType = record.getUserRecordType();
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index e96f808..f35e590 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -1181,9 +1181,12 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
 
       MessageReference reference = MessageReference.Factory.createReference(message, queue);
 
-      Long scheduledDeliveryTime = message.getScheduledDeliveryTime();
-      if (scheduledDeliveryTime != null) {
-         reference.setScheduledDeliveryTime(scheduledDeliveryTime);
+      Long scheduledDeliveryTime;
+      if (message.hasScheduledDeliveryTime()) {
+         scheduledDeliveryTime = message.getScheduledDeliveryTime();
+         if (scheduledDeliveryTime != null) {
+            reference.setScheduledDeliveryTime(scheduledDeliveryTime);
+         }
       }
 
       message.incrementDurableRefCount();
@@ -1433,7 +1436,10 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
 
       Transaction tx = context.getTransaction();
 
-      Long deliveryTime = message.getScheduledDeliveryTime();
+      Long deliveryTime = null;
+      if (message.hasScheduledDeliveryTime()) {
+         deliveryTime = message.getScheduledDeliveryTime();
+      }
 
       for (Map.Entry<SimpleString, RouteContextList> entry : context.getContexListing().entrySet()) {
          PagingStore store = pagingManager.getPageStore(entry.getKey());