You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@activemq.apache.org by GitBox <gi...@apache.org> on 2020/02/09 17:53:52 UTC

[GitHub] [activemq-artemis] franz1981 opened a new pull request #2975: ARTEMIS-2617 Improve AMQP Journal loading

franz1981 opened a new pull request #2975: ARTEMIS-2617 Improve AMQP Journal loading
URL: https://github.com/apache/activemq-artemis/pull/2975
 
 
   I've opened this PR for discussion.
   
   Currently it brings 4 improvements:
   
   1. enable G1GC string deduplication
   2. uses `CoreMessageObjectPools` on journal loading to save allocations
   3. allows AMQP lazy message data scanning on journal loading
   4. half-brute force search `x-opt-delivery-time` to avoid message data scanning

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [activemq-artemis] tabish121 commented on a change in pull request #2975: ARTEMIS-2617 Improve AMQP Journal loading

Posted by GitBox <gi...@apache.org>.
tabish121 commented on a change in pull request #2975: ARTEMIS-2617 Improve AMQP Journal loading
URL: https://github.com/apache/activemq-artemis/pull/2975#discussion_r377311730
 
 

 ##########
 File path: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
 ##########
 @@ -1183,8 +1219,126 @@ public Object getCorrelationID() {
       return this;
    }
 
+   /**
+    * https://en.wikipedia.org/wiki/Knuth%E2%80%93Morris%E2%80%93Pratt_algorithm jump table
+    */
+   private static int[] createJumpTable(byte[] needle) {
+      final int[] jumpTable = new int[needle.length + 1];
+      int j = 0;
+      for (int i = 1; i < needle.length; i++) {
+         while (j > 0 && needle[j] != needle[i]) {
+            j = jumpTable[j];
+         }
+         if (needle[j] == needle[i]) {
+            j++;
+         }
+         jumpTable[i + 1] = j;
+      }
+      for (int i = 1; i < jumpTable.length; i++) {
+         if (jumpTable[i] != 0) {
+            return jumpTable;
+         }
+      }
+      // optimization over the original algorithm: it would save from accessing any jump table
+      return null;
+   }
+
+   /**
+    * https://en.wikipedia.org/wiki/Knuth%E2%80%93Morris%E2%80%93Pratt_algorithm search algorithm
+    */
+   private static int indexOf(ReadableBuffer haystack, int start, int end, byte[] needle, int[] jumpTable) {
+      final long length = end - start;
+      int j = 0;
+      final int needleLength = needle.length;
+      for (int i = 0; i < length; i++) {
+         final int index = start + i;
+         final byte value = haystack.get(index);
+         while (j > 0 && needle[j] != value) {
+            j = jumpTable == null ? 0 : jumpTable[j];
+         }
+         if (needle[j] == value) {
+            j++;
+         }
+         if (j == needleLength) {
+            final int startMatch = index - needleLength + 1;
+            return startMatch;
+         }
+      }
+      return -1;
+   }
+
+   @Override
+   public boolean hasScheduledDeliveryTime() {
+      if (scheduledTime >= 0) {
+         return true;
+      }
+      return anyMessageAnnotations(SCHEDULED_DELIVERY_SYMBOLS,
+                                   SCHEDULED_DELIVERY_ENCODED,
+                                   SCHEDULED_DELIVERY_JUMP_TABLES);
+   }
+
+   private static boolean isValidSymbolEncoding(Symbol[] symbols, byte[][] encodedSymbols) {
+      for (int i = 0, size = symbols.length; i < size; i++) {
+         final Symbol symbol = symbols[i];
+         final byte[] encoded = encodedSymbols[i];
+         if (!Arrays.equals(symbol.toString().getBytes(StandardCharsets.US_ASCII), encoded)) {
+            return false;
+         }
+      }
+      return true;
+   }
+
+   private boolean anyMessageAnnotations(Symbol[] symbols, byte[][] encodedSymbols, int[][] jumpTables) {
+      assert isValidSymbolEncoding(symbols, encodedSymbols);
+      final int count = symbols.length;
+      if (messageDataScanned == SCANNED) {
+         final MessageAnnotations messageAnnotations = this.messageAnnotations;
+         if (messageAnnotations == null) {
+            return false;
+         }
+         Map<Symbol, Object> map = messageAnnotations.getValue();
+         if (map == null) {
+            return false;
+         }
+         for (int i = 0; i < count; i++) {
+            if (map.containsKey(symbols[i])) {
+               return true;
+            }
+         }
+         return false;
+      }
+      // messageDataScanned != SCANNED;
+      final ReadableBuffer data = this.data;
+      DecoderImpl decoder = TLSEncode.getDecoder();
+      final int position = data.position();
+      decoder.setBuffer(data.rewind());
+      try {
+         while (data.hasRemaining()) {
+            TypeConstructor<?> constructor = decoder.readConstructor();
+            if (MessageAnnotations.class.equals(constructor.getTypeClass())) {
 
 Review comment:
   Once you pass the Header and Delivery Annotations (should they actually be there) than either you will get a Message Annotations section, or you will get one of the four possible sections that follows it or nothing if the message contains no sections following the delivery annotations.  

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [activemq-artemis] franz1981 commented on a change in pull request #2975: ARTEMIS-2617 Improve AMQP Journal loading

Posted by GitBox <gi...@apache.org>.
franz1981 commented on a change in pull request #2975: ARTEMIS-2617 Improve AMQP Journal loading
URL: https://github.com/apache/activemq-artemis/pull/2975#discussion_r377832853
 
 

 ##########
 File path: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
 ##########
 @@ -1183,8 +1263,129 @@ public Object getCorrelationID() {
       return this;
    }
 
+   /**
+    * https://en.wikipedia.org/wiki/Knuth%E2%80%93Morris%E2%80%93Pratt_algorithm jump table
+    */
+   private static int[] createJumpTable(byte[] needle) {
+      final int[] jumpTable = new int[needle.length + 1];
+      int j = 0;
+      for (int i = 1; i < needle.length; i++) {
+         while (j > 0 && needle[j] != needle[i]) {
+            j = jumpTable[j];
+         }
+         if (needle[j] == needle[i]) {
+            j++;
+         }
+         jumpTable[i + 1] = j;
+      }
+      for (int i = 1; i < jumpTable.length; i++) {
+         if (jumpTable[i] != 0) {
+            return jumpTable;
+         }
+      }
+      // optimization over the original algorithm: it would save from accessing any jump table
+      return null;
+   }
+
+   /**
+    * https://en.wikipedia.org/wiki/Knuth%E2%80%93Morris%E2%80%93Pratt_algorithm search algorithm:
+    *
+    * This version differ from the original algorithm, because allows to fail fast (and faster) if
+    * the remaining haystack to be processed is < of the remaining needle to be matched.
+    */
+   private static int indexOf(ReadableBuffer haystack, int start, int end, byte[] needle, int[] jumpTable) {
+      assert end >= 0 && start >= 0 && end >= start;
+      final int length = end - start;
+      int j = 0;
+      final int needleLength = needle.length;
+      int remainingNeedle = needleLength;
+      for (int i = 0; i < length; i++) {
+         final int remainingHayStack = length - i;
+         if (remainingNeedle > remainingHayStack) {
+            return -1;
+         }
+         final int index = start + i;
+         final byte value = haystack.get(index);
+         while (j > 0 && needle[j] != value) {
+            j = jumpTable == null ? 0 : jumpTable[j];
+            remainingNeedle = needleLength - j;
+         }
+         if (needle[j] == value) {
+            j++;
+            remainingNeedle--;
+            assert remainingNeedle >= 0;
+         }
+         if (j == needleLength) {
+            final int startMatch = index - needleLength + 1;
+            return startMatch;
+         }
+      }
+      return -1;
+   }
+
+   @Override
+   public boolean hasScheduledDeliveryTime() {
+      if (scheduledTime >= 0) {
+         return true;
+      }
+      return anyMessageAnnotations(SCHEDULED_DELIVERY_SYMBOLS,
+                                   SCHEDULED_DELIVERY_ENCODED,
+                                   SCHEDULED_DELIVERY_JUMP_TABLES);
+   }
+
+   private boolean anyMessageAnnotations(Symbol[] symbols, byte[][] encodedSymbols, int[][] jumpTables) {
+      final int count = symbols.length;
+      if (messageDataScanned == SCANNED) {
+         final MessageAnnotations messageAnnotations = this.messageAnnotations;
+         if (messageAnnotations == null) {
+            return false;
+         }
+         Map<Symbol, Object> map = messageAnnotations.getValue();
+         if (map == null) {
+            return false;
+         }
+         for (int i = 0; i < count; i++) {
+            if (map.containsKey(symbols[i])) {
+               return true;
+            }
+         }
+         return false;
+      }
+      // messageDataScanned != SCANNED;
+      final ReadableBuffer data = this.data;
+      DecoderImpl decoder = TLSEncode.getDecoder();
+      final int position = data.position();
+      decoder.setBuffer(data.rewind());
+      try {
+         while (data.hasRemaining()) {
+            TypeConstructor<?> constructor = decoder.readConstructor();
 
 Review comment:
   uuuh that's much better indeed, thanks!
   Error cases? I will take a look to AMQPMessage to see if there are any or you can point me some? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [activemq-artemis] franz1981 commented on a change in pull request #2975: ARTEMIS-2617 Improve AMQP Journal loading

Posted by GitBox <gi...@apache.org>.
franz1981 commented on a change in pull request #2975: ARTEMIS-2617 Improve AMQP Journal loading
URL: https://github.com/apache/activemq-artemis/pull/2975#discussion_r377315963
 
 

 ##########
 File path: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
 ##########
 @@ -1183,8 +1219,126 @@ public Object getCorrelationID() {
       return this;
    }
 
+   /**
+    * https://en.wikipedia.org/wiki/Knuth%E2%80%93Morris%E2%80%93Pratt_algorithm jump table
+    */
+   private static int[] createJumpTable(byte[] needle) {
+      final int[] jumpTable = new int[needle.length + 1];
+      int j = 0;
+      for (int i = 1; i < needle.length; i++) {
+         while (j > 0 && needle[j] != needle[i]) {
+            j = jumpTable[j];
+         }
+         if (needle[j] == needle[i]) {
+            j++;
+         }
+         jumpTable[i + 1] = j;
+      }
+      for (int i = 1; i < jumpTable.length; i++) {
+         if (jumpTable[i] != 0) {
+            return jumpTable;
+         }
+      }
+      // optimization over the original algorithm: it would save from accessing any jump table
+      return null;
+   }
+
+   /**
+    * https://en.wikipedia.org/wiki/Knuth%E2%80%93Morris%E2%80%93Pratt_algorithm search algorithm
+    */
+   private static int indexOf(ReadableBuffer haystack, int start, int end, byte[] needle, int[] jumpTable) {
+      final long length = end - start;
+      int j = 0;
+      final int needleLength = needle.length;
+      for (int i = 0; i < length; i++) {
+         final int index = start + i;
+         final byte value = haystack.get(index);
+         while (j > 0 && needle[j] != value) {
+            j = jumpTable == null ? 0 : jumpTable[j];
+         }
+         if (needle[j] == value) {
+            j++;
+         }
+         if (j == needleLength) {
+            final int startMatch = index - needleLength + 1;
+            return startMatch;
+         }
+      }
+      return -1;
+   }
+
+   @Override
+   public boolean hasScheduledDeliveryTime() {
+      if (scheduledTime >= 0) {
+         return true;
+      }
+      return anyMessageAnnotations(SCHEDULED_DELIVERY_SYMBOLS,
+                                   SCHEDULED_DELIVERY_ENCODED,
+                                   SCHEDULED_DELIVERY_JUMP_TABLES);
+   }
+
+   private static boolean isValidSymbolEncoding(Symbol[] symbols, byte[][] encodedSymbols) {
+      for (int i = 0, size = symbols.length; i < size; i++) {
+         final Symbol symbol = symbols[i];
+         final byte[] encoded = encodedSymbols[i];
+         if (!Arrays.equals(symbol.toString().getBytes(StandardCharsets.US_ASCII), encoded)) {
+            return false;
+         }
+      }
+      return true;
+   }
+
+   private boolean anyMessageAnnotations(Symbol[] symbols, byte[][] encodedSymbols, int[][] jumpTables) {
+      assert isValidSymbolEncoding(symbols, encodedSymbols);
+      final int count = symbols.length;
+      if (messageDataScanned == SCANNED) {
+         final MessageAnnotations messageAnnotations = this.messageAnnotations;
+         if (messageAnnotations == null) {
+            return false;
+         }
+         Map<Symbol, Object> map = messageAnnotations.getValue();
+         if (map == null) {
+            return false;
+         }
+         for (int i = 0; i < count; i++) {
+            if (map.containsKey(symbols[i])) {
+               return true;
+            }
+         }
+         return false;
+      }
+      // messageDataScanned != SCANNED;
+      final ReadableBuffer data = this.data;
+      DecoderImpl decoder = TLSEncode.getDecoder();
+      final int position = data.position();
+      decoder.setBuffer(data.rewind());
+      try {
+         while (data.hasRemaining()) {
+            TypeConstructor<?> constructor = decoder.readConstructor();
+            if (MessageAnnotations.class.equals(constructor.getTypeClass())) {
 
 Review comment:
   Great, tomorrow will add a commit to improve that + some tests.
   Now my other concern is related to the lazely message scan: I've introduced a whole new state in the message and I hope it won't mess with some earlier assumptions now broken (routingType now force a message scan, while before it was assuming to always being correctly populated)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [activemq-artemis] franz1981 commented on issue #2975: ARTEMIS-2617 Improve AMQP Journal loading

Posted by GitBox <gi...@apache.org>.
franz1981 commented on issue #2975: ARTEMIS-2617 Improve AMQP Journal loading
URL: https://github.com/apache/activemq-artemis/pull/2975#issuecomment-583875107
 
 
   @tabish121 @gemmellr @clebertsuconic @michaelandrepearce 
   
   This PR contains many changes and I know there are parts that could be simplified a lot.
   The most important bits that can really be a game-changer for AMQP on journal loading are on https://github.com/apache/activemq-artemis/pull/2975/commits/fc77a546ca1ee047218e938f0413c55ed8c838c6.
   
   So please review me especially that part.
   
   An important note: 
   I've found that `AMQPMessagePersisterV2::decode` does something weird re `AMQPMessage::setAddress`: `AMQPMessagePersister::decode` can `AMQPMessage::setAddress`, allocating `AMQPMessage::extraProperties`, but 
   `AMQPMessagePersisterV2::decode`  can decode a new `extraProperties` totally overwriting the existing one. Is it a bug or it's suppose to work like that?
   I see that this behaviour has been introduced on `ARTEMIS-1858`  in 1ae2784dc6075875b18780fa8ba40f86cb895f7b with this comment:
   ```java
      /**
       * This will set the address on CoreMessage.
       *
       * Note for AMQPMessages:
       * in AMQPMessages this will not really change the address on the message. Instead it will add a property
       * on extraProperties which only transverse internally at the broker.
       * Whatever you change here it won't affect anything towards the received message.
       *
       * If you wish to change AMQPMessages address you will have to do it directly at the AMQP Message, however beware
       * that AMQPMessages are not supposed to be changed at the broker, so only do it if you know what you are doing.
       * @param address
       * @return
       */
      Message setAddress(SimpleString address);
   ```
   I've tried to mitigate this by using `CoreMessageObjectPools` to save `SimpleString` allocations [here](https://github.com/franz1981/activemq-artemis/blob/918800b309009fd440bf83b69b54a0d4f49db940/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersisterV2.java#L78) and by allowing `TypedProperties::decode` to append/replace any existing `extraProperty`, saving an `HashMap` allocation, but IMO it could be addressed by properly fixing it, wdyt? any idea how?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [activemq-artemis] franz1981 commented on a change in pull request #2975: ARTEMIS-2617 Improve AMQP Journal loading

Posted by GitBox <gi...@apache.org>.
franz1981 commented on a change in pull request #2975: ARTEMIS-2617 Improve AMQP Journal loading
URL: https://github.com/apache/activemq-artemis/pull/2975#discussion_r377296935
 
 

 ##########
 File path: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
 ##########
 @@ -1183,8 +1219,126 @@ public Object getCorrelationID() {
       return this;
    }
 
+   /**
+    * https://en.wikipedia.org/wiki/Knuth%E2%80%93Morris%E2%80%93Pratt_algorithm jump table
+    */
+   private static int[] createJumpTable(byte[] needle) {
+      final int[] jumpTable = new int[needle.length + 1];
+      int j = 0;
+      for (int i = 1; i < needle.length; i++) {
+         while (j > 0 && needle[j] != needle[i]) {
+            j = jumpTable[j];
+         }
+         if (needle[j] == needle[i]) {
+            j++;
+         }
+         jumpTable[i + 1] = j;
+      }
+      for (int i = 1; i < jumpTable.length; i++) {
+         if (jumpTable[i] != 0) {
+            return jumpTable;
+         }
+      }
+      // optimization over the original algorithm: it would save from accessing any jump table
+      return null;
+   }
+
+   /**
+    * https://en.wikipedia.org/wiki/Knuth%E2%80%93Morris%E2%80%93Pratt_algorithm search algorithm
+    */
+   private static int indexOf(ReadableBuffer haystack, int start, int end, byte[] needle, int[] jumpTable) {
+      final long length = end - start;
+      int j = 0;
+      final int needleLength = needle.length;
+      for (int i = 0; i < length; i++) {
+         final int index = start + i;
+         final byte value = haystack.get(index);
+         while (j > 0 && needle[j] != value) {
+            j = jumpTable == null ? 0 : jumpTable[j];
+         }
+         if (needle[j] == value) {
+            j++;
+         }
+         if (j == needleLength) {
+            final int startMatch = index - needleLength + 1;
+            return startMatch;
+         }
+      }
+      return -1;
+   }
+
+   @Override
+   public boolean hasScheduledDeliveryTime() {
+      if (scheduledTime >= 0) {
+         return true;
+      }
+      return anyMessageAnnotations(SCHEDULED_DELIVERY_SYMBOLS,
+                                   SCHEDULED_DELIVERY_ENCODED,
+                                   SCHEDULED_DELIVERY_JUMP_TABLES);
+   }
+
+   private static boolean isValidSymbolEncoding(Symbol[] symbols, byte[][] encodedSymbols) {
+      for (int i = 0, size = symbols.length; i < size; i++) {
+         final Symbol symbol = symbols[i];
+         final byte[] encoded = encodedSymbols[i];
+         if (!Arrays.equals(symbol.toString().getBytes(StandardCharsets.US_ASCII), encoded)) {
+            return false;
+         }
+      }
+      return true;
+   }
+
+   private boolean anyMessageAnnotations(Symbol[] symbols, byte[][] encodedSymbols, int[][] jumpTables) {
+      assert isValidSymbolEncoding(symbols, encodedSymbols);
+      final int count = symbols.length;
+      if (messageDataScanned == SCANNED) {
+         final MessageAnnotations messageAnnotations = this.messageAnnotations;
+         if (messageAnnotations == null) {
+            return false;
+         }
+         Map<Symbol, Object> map = messageAnnotations.getValue();
+         if (map == null) {
+            return false;
+         }
+         for (int i = 0; i < count; i++) {
+            if (map.containsKey(symbols[i])) {
+               return true;
+            }
+         }
+         return false;
+      }
+      // messageDataScanned != SCANNED;
+      final ReadableBuffer data = this.data;
+      DecoderImpl decoder = TLSEncode.getDecoder();
+      final int position = data.position();
+      decoder.setBuffer(data.rewind());
+      try {
+         while (data.hasRemaining()) {
+            TypeConstructor<?> constructor = decoder.readConstructor();
+            if (MessageAnnotations.class.equals(constructor.getTypeClass())) {
 
 Review comment:
   How to quick search for this case? So you think it would beneficial to have such method directly on proton? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [activemq-artemis] tabish121 commented on a change in pull request #2975: ARTEMIS-2617 Improve AMQP Journal loading

Posted by GitBox <gi...@apache.org>.
tabish121 commented on a change in pull request #2975: ARTEMIS-2617 Improve AMQP Journal loading
URL: https://github.com/apache/activemq-artemis/pull/2975#discussion_r377264879
 
 

 ##########
 File path: artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java
 ##########
 @@ -156,6 +157,72 @@ public void testCreateMessageForPersistenceDataReload() throws ActiveMQException
       assertEquals(TEST_TO_ADDRESS, message.getAddress());
    }
 
+   @Test
 
 Review comment:
   The tests seem a bit simplistic for such a change, what about annotations sections with many different annotations or annotations that don't match what you are looking for or are very very close but not quite what you are looking for etc.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [activemq-artemis] tabish121 commented on a change in pull request #2975: ARTEMIS-2617 Improve AMQP Journal loading

Posted by GitBox <gi...@apache.org>.
tabish121 commented on a change in pull request #2975: ARTEMIS-2617 Improve AMQP Journal loading
URL: https://github.com/apache/activemq-artemis/pull/2975#discussion_r377264471
 
 

 ##########
 File path: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
 ##########
 @@ -1183,8 +1219,126 @@ public Object getCorrelationID() {
       return this;
    }
 
+   /**
+    * https://en.wikipedia.org/wiki/Knuth%E2%80%93Morris%E2%80%93Pratt_algorithm jump table
+    */
+   private static int[] createJumpTable(byte[] needle) {
+      final int[] jumpTable = new int[needle.length + 1];
+      int j = 0;
+      for (int i = 1; i < needle.length; i++) {
+         while (j > 0 && needle[j] != needle[i]) {
+            j = jumpTable[j];
+         }
+         if (needle[j] == needle[i]) {
+            j++;
+         }
+         jumpTable[i + 1] = j;
+      }
+      for (int i = 1; i < jumpTable.length; i++) {
+         if (jumpTable[i] != 0) {
+            return jumpTable;
+         }
+      }
+      // optimization over the original algorithm: it would save from accessing any jump table
+      return null;
+   }
+
+   /**
+    * https://en.wikipedia.org/wiki/Knuth%E2%80%93Morris%E2%80%93Pratt_algorithm search algorithm
+    */
+   private static int indexOf(ReadableBuffer haystack, int start, int end, byte[] needle, int[] jumpTable) {
+      final long length = end - start;
+      int j = 0;
+      final int needleLength = needle.length;
+      for (int i = 0; i < length; i++) {
+         final int index = start + i;
+         final byte value = haystack.get(index);
+         while (j > 0 && needle[j] != value) {
+            j = jumpTable == null ? 0 : jumpTable[j];
+         }
+         if (needle[j] == value) {
+            j++;
+         }
+         if (j == needleLength) {
+            final int startMatch = index - needleLength + 1;
+            return startMatch;
+         }
+      }
+      return -1;
+   }
+
+   @Override
+   public boolean hasScheduledDeliveryTime() {
+      if (scheduledTime >= 0) {
+         return true;
+      }
+      return anyMessageAnnotations(SCHEDULED_DELIVERY_SYMBOLS,
+                                   SCHEDULED_DELIVERY_ENCODED,
+                                   SCHEDULED_DELIVERY_JUMP_TABLES);
+   }
+
+   private static boolean isValidSymbolEncoding(Symbol[] symbols, byte[][] encodedSymbols) {
+      for (int i = 0, size = symbols.length; i < size; i++) {
+         final Symbol symbol = symbols[i];
+         final byte[] encoded = encodedSymbols[i];
+         if (!Arrays.equals(symbol.toString().getBytes(StandardCharsets.US_ASCII), encoded)) {
+            return false;
+         }
+      }
+      return true;
+   }
+
+   private boolean anyMessageAnnotations(Symbol[] symbols, byte[][] encodedSymbols, int[][] jumpTables) {
+      assert isValidSymbolEncoding(symbols, encodedSymbols);
+      final int count = symbols.length;
+      if (messageDataScanned == SCANNED) {
+         final MessageAnnotations messageAnnotations = this.messageAnnotations;
+         if (messageAnnotations == null) {
+            return false;
+         }
+         Map<Symbol, Object> map = messageAnnotations.getValue();
+         if (map == null) {
+            return false;
+         }
+         for (int i = 0; i < count; i++) {
+            if (map.containsKey(symbols[i])) {
+               return true;
+            }
+         }
+         return false;
+      }
+      // messageDataScanned != SCANNED;
+      final ReadableBuffer data = this.data;
+      DecoderImpl decoder = TLSEncode.getDecoder();
+      final int position = data.position();
+      decoder.setBuffer(data.rewind());
+      try {
+         while (data.hasRemaining()) {
+            TypeConstructor<?> constructor = decoder.readConstructor();
+            if (MessageAnnotations.class.equals(constructor.getTypeClass())) {
 
 Review comment:
   You should account for the message not having any MessageAnnotations otherwise you will scan in every type constructor for the follow on sections if present for no reason.  There's no requirement that a message have a Message Annotations section.  

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [activemq-artemis] franz1981 commented on a change in pull request #2975: ARTEMIS-2617 Improve AMQP Journal loading

Posted by GitBox <gi...@apache.org>.
franz1981 commented on a change in pull request #2975: ARTEMIS-2617 Improve AMQP Journal loading
URL: https://github.com/apache/activemq-artemis/pull/2975#discussion_r377849841
 
 

 ##########
 File path: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
 ##########
 @@ -1183,8 +1263,129 @@ public Object getCorrelationID() {
       return this;
    }
 
+   /**
+    * https://en.wikipedia.org/wiki/Knuth%E2%80%93Morris%E2%80%93Pratt_algorithm jump table
+    */
+   private static int[] createJumpTable(byte[] needle) {
+      final int[] jumpTable = new int[needle.length + 1];
+      int j = 0;
+      for (int i = 1; i < needle.length; i++) {
+         while (j > 0 && needle[j] != needle[i]) {
+            j = jumpTable[j];
+         }
+         if (needle[j] == needle[i]) {
+            j++;
+         }
+         jumpTable[i + 1] = j;
+      }
+      for (int i = 1; i < jumpTable.length; i++) {
+         if (jumpTable[i] != 0) {
+            return jumpTable;
+         }
+      }
+      // optimization over the original algorithm: it would save from accessing any jump table
+      return null;
+   }
+
+   /**
+    * https://en.wikipedia.org/wiki/Knuth%E2%80%93Morris%E2%80%93Pratt_algorithm search algorithm:
+    *
+    * This version differ from the original algorithm, because allows to fail fast (and faster) if
+    * the remaining haystack to be processed is < of the remaining needle to be matched.
+    */
+   private static int indexOf(ReadableBuffer haystack, int start, int end, byte[] needle, int[] jumpTable) {
+      assert end >= 0 && start >= 0 && end >= start;
+      final int length = end - start;
+      int j = 0;
+      final int needleLength = needle.length;
+      int remainingNeedle = needleLength;
+      for (int i = 0; i < length; i++) {
+         final int remainingHayStack = length - i;
+         if (remainingNeedle > remainingHayStack) {
+            return -1;
+         }
+         final int index = start + i;
+         final byte value = haystack.get(index);
+         while (j > 0 && needle[j] != value) {
+            j = jumpTable == null ? 0 : jumpTable[j];
+            remainingNeedle = needleLength - j;
+         }
+         if (needle[j] == value) {
+            j++;
+            remainingNeedle--;
+            assert remainingNeedle >= 0;
+         }
+         if (j == needleLength) {
+            final int startMatch = index - needleLength + 1;
+            return startMatch;
+         }
+      }
+      return -1;
+   }
+
+   @Override
+   public boolean hasScheduledDeliveryTime() {
+      if (scheduledTime >= 0) {
+         return true;
+      }
+      return anyMessageAnnotations(SCHEDULED_DELIVERY_SYMBOLS,
+                                   SCHEDULED_DELIVERY_ENCODED,
+                                   SCHEDULED_DELIVERY_JUMP_TABLES);
+   }
+
+   private boolean anyMessageAnnotations(Symbol[] symbols, byte[][] encodedSymbols, int[][] jumpTables) {
+      final int count = symbols.length;
+      if (messageDataScanned == SCANNED) {
+         final MessageAnnotations messageAnnotations = this.messageAnnotations;
+         if (messageAnnotations == null) {
+            return false;
+         }
+         Map<Symbol, Object> map = messageAnnotations.getValue();
+         if (map == null) {
+            return false;
+         }
+         for (int i = 0; i < count; i++) {
+            if (map.containsKey(symbols[i])) {
+               return true;
+            }
+         }
+         return false;
+      }
+      // messageDataScanned != SCANNED;
+      final ReadableBuffer data = this.data;
+      DecoderImpl decoder = TLSEncode.getDecoder();
+      final int position = data.position();
+      decoder.setBuffer(data.rewind());
+      try {
+         while (data.hasRemaining()) {
+            TypeConstructor<?> constructor = decoder.readConstructor();
 
 Review comment:
   In this case probably throwing IllegalStateEx is the safer choice here

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [activemq-artemis] franz1981 commented on a change in pull request #2975: ARTEMIS-2617 Improve AMQP Journal loading

Posted by GitBox <gi...@apache.org>.
franz1981 commented on a change in pull request #2975: ARTEMIS-2617 Improve AMQP Journal loading
URL: https://github.com/apache/activemq-artemis/pull/2975#discussion_r377296935
 
 

 ##########
 File path: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
 ##########
 @@ -1183,8 +1219,126 @@ public Object getCorrelationID() {
       return this;
    }
 
+   /**
+    * https://en.wikipedia.org/wiki/Knuth%E2%80%93Morris%E2%80%93Pratt_algorithm jump table
+    */
+   private static int[] createJumpTable(byte[] needle) {
+      final int[] jumpTable = new int[needle.length + 1];
+      int j = 0;
+      for (int i = 1; i < needle.length; i++) {
+         while (j > 0 && needle[j] != needle[i]) {
+            j = jumpTable[j];
+         }
+         if (needle[j] == needle[i]) {
+            j++;
+         }
+         jumpTable[i + 1] = j;
+      }
+      for (int i = 1; i < jumpTable.length; i++) {
+         if (jumpTable[i] != 0) {
+            return jumpTable;
+         }
+      }
+      // optimization over the original algorithm: it would save from accessing any jump table
+      return null;
+   }
+
+   /**
+    * https://en.wikipedia.org/wiki/Knuth%E2%80%93Morris%E2%80%93Pratt_algorithm search algorithm
+    */
+   private static int indexOf(ReadableBuffer haystack, int start, int end, byte[] needle, int[] jumpTable) {
+      final long length = end - start;
+      int j = 0;
+      final int needleLength = needle.length;
+      for (int i = 0; i < length; i++) {
+         final int index = start + i;
+         final byte value = haystack.get(index);
+         while (j > 0 && needle[j] != value) {
+            j = jumpTable == null ? 0 : jumpTable[j];
+         }
+         if (needle[j] == value) {
+            j++;
+         }
+         if (j == needleLength) {
+            final int startMatch = index - needleLength + 1;
+            return startMatch;
+         }
+      }
+      return -1;
+   }
+
+   @Override
+   public boolean hasScheduledDeliveryTime() {
+      if (scheduledTime >= 0) {
+         return true;
+      }
+      return anyMessageAnnotations(SCHEDULED_DELIVERY_SYMBOLS,
+                                   SCHEDULED_DELIVERY_ENCODED,
+                                   SCHEDULED_DELIVERY_JUMP_TABLES);
+   }
+
+   private static boolean isValidSymbolEncoding(Symbol[] symbols, byte[][] encodedSymbols) {
+      for (int i = 0, size = symbols.length; i < size; i++) {
+         final Symbol symbol = symbols[i];
+         final byte[] encoded = encodedSymbols[i];
+         if (!Arrays.equals(symbol.toString().getBytes(StandardCharsets.US_ASCII), encoded)) {
+            return false;
+         }
+      }
+      return true;
+   }
+
+   private boolean anyMessageAnnotations(Symbol[] symbols, byte[][] encodedSymbols, int[][] jumpTables) {
+      assert isValidSymbolEncoding(symbols, encodedSymbols);
+      final int count = symbols.length;
+      if (messageDataScanned == SCANNED) {
+         final MessageAnnotations messageAnnotations = this.messageAnnotations;
+         if (messageAnnotations == null) {
+            return false;
+         }
+         Map<Symbol, Object> map = messageAnnotations.getValue();
+         if (map == null) {
+            return false;
+         }
+         for (int i = 0; i < count; i++) {
+            if (map.containsKey(symbols[i])) {
+               return true;
+            }
+         }
+         return false;
+      }
+      // messageDataScanned != SCANNED;
+      final ReadableBuffer data = this.data;
+      DecoderImpl decoder = TLSEncode.getDecoder();
+      final int position = data.position();
+      decoder.setBuffer(data.rewind());
+      try {
+         while (data.hasRemaining()) {
+            TypeConstructor<?> constructor = decoder.readConstructor();
+            if (MessageAnnotations.class.equals(constructor.getTypeClass())) {
 
 Review comment:
   How to quick search for this case? Do you think it would beneficial to have such method directly on proton? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [activemq-artemis] tabish121 commented on a change in pull request #2975: ARTEMIS-2617 Improve AMQP Journal loading

Posted by GitBox <gi...@apache.org>.
tabish121 commented on a change in pull request #2975: ARTEMIS-2617 Improve AMQP Journal loading
URL: https://github.com/apache/activemq-artemis/pull/2975#discussion_r377327729
 
 

 ##########
 File path: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
 ##########
 @@ -1183,8 +1219,126 @@ public Object getCorrelationID() {
       return this;
    }
 
+   /**
+    * https://en.wikipedia.org/wiki/Knuth%E2%80%93Morris%E2%80%93Pratt_algorithm jump table
+    */
+   private static int[] createJumpTable(byte[] needle) {
+      final int[] jumpTable = new int[needle.length + 1];
+      int j = 0;
+      for (int i = 1; i < needle.length; i++) {
+         while (j > 0 && needle[j] != needle[i]) {
+            j = jumpTable[j];
+         }
+         if (needle[j] == needle[i]) {
+            j++;
+         }
+         jumpTable[i + 1] = j;
+      }
+      for (int i = 1; i < jumpTable.length; i++) {
+         if (jumpTable[i] != 0) {
+            return jumpTable;
+         }
+      }
+      // optimization over the original algorithm: it would save from accessing any jump table
+      return null;
+   }
+
+   /**
+    * https://en.wikipedia.org/wiki/Knuth%E2%80%93Morris%E2%80%93Pratt_algorithm search algorithm
+    */
+   private static int indexOf(ReadableBuffer haystack, int start, int end, byte[] needle, int[] jumpTable) {
+      final long length = end - start;
+      int j = 0;
+      final int needleLength = needle.length;
+      for (int i = 0; i < length; i++) {
+         final int index = start + i;
+         final byte value = haystack.get(index);
+         while (j > 0 && needle[j] != value) {
+            j = jumpTable == null ? 0 : jumpTable[j];
+         }
+         if (needle[j] == value) {
+            j++;
+         }
+         if (j == needleLength) {
+            final int startMatch = index - needleLength + 1;
+            return startMatch;
+         }
+      }
+      return -1;
+   }
+
+   @Override
+   public boolean hasScheduledDeliveryTime() {
+      if (scheduledTime >= 0) {
+         return true;
+      }
+      return anyMessageAnnotations(SCHEDULED_DELIVERY_SYMBOLS,
+                                   SCHEDULED_DELIVERY_ENCODED,
+                                   SCHEDULED_DELIVERY_JUMP_TABLES);
+   }
+
+   private static boolean isValidSymbolEncoding(Symbol[] symbols, byte[][] encodedSymbols) {
+      for (int i = 0, size = symbols.length; i < size; i++) {
+         final Symbol symbol = symbols[i];
+         final byte[] encoded = encodedSymbols[i];
+         if (!Arrays.equals(symbol.toString().getBytes(StandardCharsets.US_ASCII), encoded)) {
+            return false;
+         }
+      }
+      return true;
+   }
+
+   private boolean anyMessageAnnotations(Symbol[] symbols, byte[][] encodedSymbols, int[][] jumpTables) {
+      assert isValidSymbolEncoding(symbols, encodedSymbols);
+      final int count = symbols.length;
+      if (messageDataScanned == SCANNED) {
+         final MessageAnnotations messageAnnotations = this.messageAnnotations;
+         if (messageAnnotations == null) {
+            return false;
+         }
+         Map<Symbol, Object> map = messageAnnotations.getValue();
+         if (map == null) {
+            return false;
+         }
+         for (int i = 0; i < count; i++) {
+            if (map.containsKey(symbols[i])) {
+               return true;
+            }
+         }
+         return false;
+      }
+      // messageDataScanned != SCANNED;
+      final ReadableBuffer data = this.data;
+      DecoderImpl decoder = TLSEncode.getDecoder();
+      final int position = data.position();
+      decoder.setBuffer(data.rewind());
+      try {
+         while (data.hasRemaining()) {
+            TypeConstructor<?> constructor = decoder.readConstructor();
+            if (MessageAnnotations.class.equals(constructor.getTypeClass())) {
 
 Review comment:
   Wouldn't it be nice if the journal entry stored all needed metadata in an easily parsed canonical format so that all protocols could benefit from the same optimizations.  :)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [activemq-artemis] franz1981 commented on issue #2975: ARTEMIS-2617 Improve AMQP Journal loading

Posted by GitBox <gi...@apache.org>.
franz1981 commented on issue #2975: ARTEMIS-2617 Improve AMQP Journal loading
URL: https://github.com/apache/activemq-artemis/pull/2975#issuecomment-583890886
 
 
   In addition, I hope I haven't missed to force message data scanning on getter of other properties eg routingType.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [activemq-artemis] franz1981 commented on a change in pull request #2975: ARTEMIS-2617 Improve AMQP Journal loading

Posted by GitBox <gi...@apache.org>.
franz1981 commented on a change in pull request #2975: ARTEMIS-2617 Improve AMQP Journal loading
URL: https://github.com/apache/activemq-artemis/pull/2975#discussion_r377322537
 
 

 ##########
 File path: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
 ##########
 @@ -1183,8 +1219,126 @@ public Object getCorrelationID() {
       return this;
    }
 
+   /**
+    * https://en.wikipedia.org/wiki/Knuth%E2%80%93Morris%E2%80%93Pratt_algorithm jump table
+    */
+   private static int[] createJumpTable(byte[] needle) {
+      final int[] jumpTable = new int[needle.length + 1];
+      int j = 0;
+      for (int i = 1; i < needle.length; i++) {
+         while (j > 0 && needle[j] != needle[i]) {
+            j = jumpTable[j];
+         }
+         if (needle[j] == needle[i]) {
+            j++;
+         }
+         jumpTable[i + 1] = j;
+      }
+      for (int i = 1; i < jumpTable.length; i++) {
+         if (jumpTable[i] != 0) {
+            return jumpTable;
+         }
+      }
+      // optimization over the original algorithm: it would save from accessing any jump table
+      return null;
+   }
+
+   /**
+    * https://en.wikipedia.org/wiki/Knuth%E2%80%93Morris%E2%80%93Pratt_algorithm search algorithm
+    */
+   private static int indexOf(ReadableBuffer haystack, int start, int end, byte[] needle, int[] jumpTable) {
+      final long length = end - start;
+      int j = 0;
+      final int needleLength = needle.length;
+      for (int i = 0; i < length; i++) {
+         final int index = start + i;
+         final byte value = haystack.get(index);
+         while (j > 0 && needle[j] != value) {
+            j = jumpTable == null ? 0 : jumpTable[j];
+         }
+         if (needle[j] == value) {
+            j++;
+         }
+         if (j == needleLength) {
+            final int startMatch = index - needleLength + 1;
+            return startMatch;
+         }
+      }
+      return -1;
+   }
+
+   @Override
+   public boolean hasScheduledDeliveryTime() {
+      if (scheduledTime >= 0) {
+         return true;
+      }
+      return anyMessageAnnotations(SCHEDULED_DELIVERY_SYMBOLS,
+                                   SCHEDULED_DELIVERY_ENCODED,
+                                   SCHEDULED_DELIVERY_JUMP_TABLES);
+   }
+
+   private static boolean isValidSymbolEncoding(Symbol[] symbols, byte[][] encodedSymbols) {
+      for (int i = 0, size = symbols.length; i < size; i++) {
+         final Symbol symbol = symbols[i];
+         final byte[] encoded = encodedSymbols[i];
+         if (!Arrays.equals(symbol.toString().getBytes(StandardCharsets.US_ASCII), encoded)) {
+            return false;
+         }
+      }
+      return true;
+   }
+
+   private boolean anyMessageAnnotations(Symbol[] symbols, byte[][] encodedSymbols, int[][] jumpTables) {
+      assert isValidSymbolEncoding(symbols, encodedSymbols);
+      final int count = symbols.length;
+      if (messageDataScanned == SCANNED) {
+         final MessageAnnotations messageAnnotations = this.messageAnnotations;
+         if (messageAnnotations == null) {
+            return false;
+         }
+         Map<Symbol, Object> map = messageAnnotations.getValue();
+         if (map == null) {
+            return false;
+         }
+         for (int i = 0; i < count; i++) {
+            if (map.containsKey(symbols[i])) {
+               return true;
+            }
+         }
+         return false;
+      }
+      // messageDataScanned != SCANNED;
+      final ReadableBuffer data = this.data;
+      DecoderImpl decoder = TLSEncode.getDecoder();
+      final int position = data.position();
+      decoder.setBuffer(data.rewind());
+      try {
+         while (data.hasRemaining()) {
+            TypeConstructor<?> constructor = decoder.readConstructor();
+            if (MessageAnnotations.class.equals(constructor.getTypeClass())) {
 
 Review comment:
   Luckily enough on journal loading routing type seems not needed (we read it from other bits on journal record probably), while the only required message info seems the scheduled delivery time indeed 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [activemq-artemis] franz1981 edited a comment on issue #2975: ARTEMIS-2617 Improve AMQP Journal loading

Posted by GitBox <gi...@apache.org>.
franz1981 edited a comment on issue #2975: ARTEMIS-2617 Improve AMQP Journal loading
URL: https://github.com/apache/activemq-artemis/pull/2975#issuecomment-584611326
 
 
   @tabish121 I've tried to improve the fail-fast behaviour for both the `Symbol` search algorithm and the `MessageAnnotations` section search. The latter by using an `IdentityHasMap` to save checking the N possible message sections that would terminate the algorithm: let me know if that makes sense according to the AMQP spec :+1: 
   
   NOTE: I've yet to add more tests 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [activemq-artemis] franz1981 edited a comment on issue #2975: ARTEMIS-2617 Improve AMQP Journal loading

Posted by GitBox <gi...@apache.org>.
franz1981 edited a comment on issue #2975: ARTEMIS-2617 Improve AMQP Journal loading
URL: https://github.com/apache/activemq-artemis/pull/2975#issuecomment-583875107
 
 
   @tabish121 @gemmellr @clebertsuconic @michaelandrepearce 
   
   This PR contains many changes and I know there are parts that could be simplified a lot.
   The most important bits that can really be a game-changer for AMQP on journal loading are on https://github.com/apache/activemq-artemis/pull/2975/commits/fc77a546ca1ee047218e938f0413c55ed8c838c6.
   
   So please review especially that part.
   
   An important note: 
   I've found that `AMQPMessagePersisterV2::decode` does something weird re `AMQPMessage::setAddress`: `AMQPMessagePersister::decode` can `AMQPMessage::setAddress`, allocating `AMQPMessage::extraProperties`, but 
   `AMQPMessagePersisterV2::decode`  can decode a new `extraProperties` overwriting the existing one. Is it a bug or it's suppose to work like that?
   I see that this behaviour has been introduced on `ARTEMIS-1858`  in 1ae2784dc6075875b18780fa8ba40f86cb895f7b with this comment:
   ```java
      /**
       * This will set the address on CoreMessage.
       *
       * Note for AMQPMessages:
       * in AMQPMessages this will not really change the address on the message. Instead it will add a property
       * on extraProperties which only transverse internally at the broker.
       * Whatever you change here it won't affect anything towards the received message.
       *
       * If you wish to change AMQPMessages address you will have to do it directly at the AMQP Message, however beware
       * that AMQPMessages are not supposed to be changed at the broker, so only do it if you know what you are doing.
       * @param address
       * @return
       */
      Message setAddress(SimpleString address);
   ```
   I've tried to mitigate this by using `CoreMessageObjectPools` to save `SimpleString` allocations [here](https://github.com/franz1981/activemq-artemis/blob/918800b309009fd440bf83b69b54a0d4f49db940/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersisterV2.java#L78) and by allowing `TypedProperties::decode` to append/replace any existing `extraProperty`, saving an `HashMap` allocation, but IMO it could be addressed by properly fixing it, wdyt? any idea how?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [activemq-artemis] tabish121 commented on a change in pull request #2975: ARTEMIS-2617 Improve AMQP Journal loading

Posted by GitBox <gi...@apache.org>.
tabish121 commented on a change in pull request #2975: ARTEMIS-2617 Improve AMQP Journal loading
URL: https://github.com/apache/activemq-artemis/pull/2975#discussion_r377771553
 
 

 ##########
 File path: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
 ##########
 @@ -1183,8 +1263,129 @@ public Object getCorrelationID() {
       return this;
    }
 
+   /**
+    * https://en.wikipedia.org/wiki/Knuth%E2%80%93Morris%E2%80%93Pratt_algorithm jump table
+    */
+   private static int[] createJumpTable(byte[] needle) {
+      final int[] jumpTable = new int[needle.length + 1];
+      int j = 0;
+      for (int i = 1; i < needle.length; i++) {
+         while (j > 0 && needle[j] != needle[i]) {
+            j = jumpTable[j];
+         }
+         if (needle[j] == needle[i]) {
+            j++;
+         }
+         jumpTable[i + 1] = j;
+      }
+      for (int i = 1; i < jumpTable.length; i++) {
+         if (jumpTable[i] != 0) {
+            return jumpTable;
+         }
+      }
+      // optimization over the original algorithm: it would save from accessing any jump table
+      return null;
+   }
+
+   /**
+    * https://en.wikipedia.org/wiki/Knuth%E2%80%93Morris%E2%80%93Pratt_algorithm search algorithm:
+    *
+    * This version differ from the original algorithm, because allows to fail fast (and faster) if
+    * the remaining haystack to be processed is < of the remaining needle to be matched.
+    */
+   private static int indexOf(ReadableBuffer haystack, int start, int end, byte[] needle, int[] jumpTable) {
+      assert end >= 0 && start >= 0 && end >= start;
+      final int length = end - start;
+      int j = 0;
+      final int needleLength = needle.length;
+      int remainingNeedle = needleLength;
+      for (int i = 0; i < length; i++) {
+         final int remainingHayStack = length - i;
+         if (remainingNeedle > remainingHayStack) {
+            return -1;
+         }
+         final int index = start + i;
+         final byte value = haystack.get(index);
+         while (j > 0 && needle[j] != value) {
+            j = jumpTable == null ? 0 : jumpTable[j];
+            remainingNeedle = needleLength - j;
+         }
+         if (needle[j] == value) {
+            j++;
+            remainingNeedle--;
+            assert remainingNeedle >= 0;
+         }
+         if (j == needleLength) {
+            final int startMatch = index - needleLength + 1;
+            return startMatch;
+         }
+      }
+      return -1;
+   }
+
+   @Override
+   public boolean hasScheduledDeliveryTime() {
+      if (scheduledTime >= 0) {
+         return true;
+      }
+      return anyMessageAnnotations(SCHEDULED_DELIVERY_SYMBOLS,
+                                   SCHEDULED_DELIVERY_ENCODED,
+                                   SCHEDULED_DELIVERY_JUMP_TABLES);
+   }
+
+   private boolean anyMessageAnnotations(Symbol[] symbols, byte[][] encodedSymbols, int[][] jumpTables) {
+      final int count = symbols.length;
+      if (messageDataScanned == SCANNED) {
+         final MessageAnnotations messageAnnotations = this.messageAnnotations;
+         if (messageAnnotations == null) {
+            return false;
+         }
+         Map<Symbol, Object> map = messageAnnotations.getValue();
+         if (map == null) {
+            return false;
+         }
+         for (int i = 0; i < count; i++) {
+            if (map.containsKey(symbols[i])) {
+               return true;
+            }
+         }
+         return false;
+      }
+      // messageDataScanned != SCANNED;
+      final ReadableBuffer data = this.data;
+      DecoderImpl decoder = TLSEncode.getDecoder();
+      final int position = data.position();
+      decoder.setBuffer(data.rewind());
+      try {
+         while (data.hasRemaining()) {
+            TypeConstructor<?> constructor = decoder.readConstructor();
 
 Review comment:
   As an alternative to the IdentityHashMap you might try casting the TypeConstructor to an AbstractDescribedType which the valid Sections in the message would be and then get the Descriptor code which is on UnsignedLong.  The values for each section would be 
   
   - Header (70)
   - DeliveryAnnotations (71)
   - MessageAnnotations (72)
   - Properties (73)
   - ApplicationProperties (74)
   - Data (75)
   - AmqpSequence (76)
   - AmqpValue (77) 
   - Footer (78)
   
   You'd need to handle the error cases but that could make for simpler logic, thing you want it 72 so keep going is less, stop if greater.  

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [activemq-artemis] franz1981 edited a comment on issue #2975: ARTEMIS-2617 Improve AMQP Journal loading

Posted by GitBox <gi...@apache.org>.
franz1981 edited a comment on issue #2975: ARTEMIS-2617 Improve AMQP Journal loading
URL: https://github.com/apache/activemq-artemis/pull/2975#issuecomment-583875107
 
 
   @tabish121 @gemmellr @clebertsuconic @michaelandrepearce 
   
   This PR contains many changes and I know there are parts that could be simplified a lot.
   The most important bits that can really be a game-changer for AMQP on journal loading are on https://github.com/apache/activemq-artemis/pull/2975/commits/40a421e30588323e15e2a08e15894b2e6d30307b.
   
   So please review especially that part.
   
   An important note: 
   I've found that `AMQPMessagePersisterV2::decode` does something weird re `AMQPMessage::setAddress`: `AMQPMessagePersister::decode` can `AMQPMessage::setAddress`, allocating `AMQPMessage::extraProperties`, but 
   `AMQPMessagePersisterV2::decode`  can decode a new `extraProperties` overwriting the existing one. Is it a bug or it's suppose to work like that?
   I see that this behaviour has been introduced on `ARTEMIS-1858`  in 1ae2784dc6075875b18780fa8ba40f86cb895f7b with this comment:
   ```java
      /**
       * This will set the address on CoreMessage.
       *
       * Note for AMQPMessages:
       * in AMQPMessages this will not really change the address on the message. Instead it will add a property
       * on extraProperties which only transverse internally at the broker.
       * Whatever you change here it won't affect anything towards the received message.
       *
       * If you wish to change AMQPMessages address you will have to do it directly at the AMQP Message, however beware
       * that AMQPMessages are not supposed to be changed at the broker, so only do it if you know what you are doing.
       * @param address
       * @return
       */
      Message setAddress(SimpleString address);
   ```
   I've tried to mitigate this by using `CoreMessageObjectPools` to save `SimpleString` allocations [here](https://github.com/franz1981/activemq-artemis/blob/918800b309009fd440bf83b69b54a0d4f49db940/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersisterV2.java#L78) and by allowing `TypedProperties::decode` to append/replace any existing `extraProperty`, saving an `HashMap` allocation, but IMO it could be addressed by properly fixing it, wdyt? any idea how?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [activemq-artemis] franz1981 edited a comment on issue #2975: ARTEMIS-2617 Improve AMQP Journal loading

Posted by GitBox <gi...@apache.org>.
franz1981 edited a comment on issue #2975: ARTEMIS-2617 Improve AMQP Journal loading
URL: https://github.com/apache/activemq-artemis/pull/2975#issuecomment-583875107
 
 
   @tabish121 @gemmellr @clebertsuconic @michaelandrepearce 
   
   This PR contains many changes and I know there are parts that could be simplified a lot.
   The most important bits that can really be a game-changer for AMQP on journal loading are on https://github.com/apache/activemq-artemis/pull/2975/commits/40a421e30588323e15e2a08e15894b2e6d30307b.
   
   So please review especially that part.
   
   An important note: 
   I've found that `AMQPMessagePersisterV2::decode` does something weird re `AMQPMessage::setAddress`: `AMQPMessagePersister::decode` can `AMQPMessage::setAddress`, allocating `AMQPMessage::extraProperties`, but 
   `AMQPMessagePersisterV2::decode`  can decode a new `extraProperties` overwriting the existing one. Is it a bug or it's suppose to work like that?
   I see that this behaviour has been introduced on `ARTEMIS-1858`  in 1ae2784dc6075875b18780fa8ba40f86cb895f7b with this comment:
   ```java
      /**
       * This will set the address on CoreMessage.
       *
       * Note for AMQPMessages:
       * in AMQPMessages this will not really change the address on the message. Instead it will add a property
       * on extraProperties which only transverse internally at the broker.
       * Whatever you change here it won't affect anything towards the received message.
       *
       * If you wish to change AMQPMessages address you will have to do it directly at the AMQP Message, however beware
       * that AMQPMessages are not supposed to be changed at the broker, so only do it if you know what you are doing.
       * @param address
       * @return
       */
      Message setAddress(SimpleString address);
   ```
   I've tried to mitigate this by using `CoreMessageObjectPools` to save `SimpleString` allocations [here](https://github.com/franz1981/activemq-artemis/blob/9ea4aa1a36271fcf30dfe0e3eaa667a237472ea5/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersisterV2.java#L78) and by allowing `TypedProperties::decode` to append/replace any existing `extraProperty`, saving an `HashMap` allocation, but IMO it could be addressed by properly fixing it, wdyt? any idea how?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [activemq-artemis] tabish121 commented on a change in pull request #2975: ARTEMIS-2617 Improve AMQP Journal loading

Posted by GitBox <gi...@apache.org>.
tabish121 commented on a change in pull request #2975: ARTEMIS-2617 Improve AMQP Journal loading
URL: https://github.com/apache/activemq-artemis/pull/2975#discussion_r377843580
 
 

 ##########
 File path: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
 ##########
 @@ -1183,8 +1263,129 @@ public Object getCorrelationID() {
       return this;
    }
 
+   /**
+    * https://en.wikipedia.org/wiki/Knuth%E2%80%93Morris%E2%80%93Pratt_algorithm jump table
+    */
+   private static int[] createJumpTable(byte[] needle) {
+      final int[] jumpTable = new int[needle.length + 1];
+      int j = 0;
+      for (int i = 1; i < needle.length; i++) {
+         while (j > 0 && needle[j] != needle[i]) {
+            j = jumpTable[j];
+         }
+         if (needle[j] == needle[i]) {
+            j++;
+         }
+         jumpTable[i + 1] = j;
+      }
+      for (int i = 1; i < jumpTable.length; i++) {
+         if (jumpTable[i] != 0) {
+            return jumpTable;
+         }
+      }
+      // optimization over the original algorithm: it would save from accessing any jump table
+      return null;
+   }
+
+   /**
+    * https://en.wikipedia.org/wiki/Knuth%E2%80%93Morris%E2%80%93Pratt_algorithm search algorithm:
+    *
+    * This version differ from the original algorithm, because allows to fail fast (and faster) if
+    * the remaining haystack to be processed is < of the remaining needle to be matched.
+    */
+   private static int indexOf(ReadableBuffer haystack, int start, int end, byte[] needle, int[] jumpTable) {
+      assert end >= 0 && start >= 0 && end >= start;
+      final int length = end - start;
+      int j = 0;
+      final int needleLength = needle.length;
+      int remainingNeedle = needleLength;
+      for (int i = 0; i < length; i++) {
+         final int remainingHayStack = length - i;
+         if (remainingNeedle > remainingHayStack) {
+            return -1;
+         }
+         final int index = start + i;
+         final byte value = haystack.get(index);
+         while (j > 0 && needle[j] != value) {
+            j = jumpTable == null ? 0 : jumpTable[j];
+            remainingNeedle = needleLength - j;
+         }
+         if (needle[j] == value) {
+            j++;
+            remainingNeedle--;
+            assert remainingNeedle >= 0;
+         }
+         if (j == needleLength) {
+            final int startMatch = index - needleLength + 1;
+            return startMatch;
+         }
+      }
+      return -1;
+   }
+
+   @Override
+   public boolean hasScheduledDeliveryTime() {
+      if (scheduledTime >= 0) {
+         return true;
+      }
+      return anyMessageAnnotations(SCHEDULED_DELIVERY_SYMBOLS,
+                                   SCHEDULED_DELIVERY_ENCODED,
+                                   SCHEDULED_DELIVERY_JUMP_TABLES);
+   }
+
+   private boolean anyMessageAnnotations(Symbol[] symbols, byte[][] encodedSymbols, int[][] jumpTables) {
+      final int count = symbols.length;
+      if (messageDataScanned == SCANNED) {
+         final MessageAnnotations messageAnnotations = this.messageAnnotations;
+         if (messageAnnotations == null) {
+            return false;
+         }
+         Map<Symbol, Object> map = messageAnnotations.getValue();
+         if (map == null) {
+            return false;
+         }
+         for (int i = 0; i < count; i++) {
+            if (map.containsKey(symbols[i])) {
+               return true;
+            }
+         }
+         return false;
+      }
+      // messageDataScanned != SCANNED;
+      final ReadableBuffer data = this.data;
+      DecoderImpl decoder = TLSEncode.getDecoder();
+      final int position = data.position();
+      decoder.setBuffer(data.rewind());
+      try {
+         while (data.hasRemaining()) {
+            TypeConstructor<?> constructor = decoder.readConstructor();
 
 Review comment:
   Basically need to deal with case where TypeConstructor you get back isn't an AbstractDescribedType for some unknown reason, should have blown up on ingress but you never know. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [activemq-artemis] tabish121 commented on a change in pull request #2975: ARTEMIS-2617 Improve AMQP Journal loading

Posted by GitBox <gi...@apache.org>.
tabish121 commented on a change in pull request #2975: ARTEMIS-2617 Improve AMQP Journal loading
URL: https://github.com/apache/activemq-artemis/pull/2975#discussion_r377320755
 
 

 ##########
 File path: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
 ##########
 @@ -1183,8 +1219,126 @@ public Object getCorrelationID() {
       return this;
    }
 
+   /**
+    * https://en.wikipedia.org/wiki/Knuth%E2%80%93Morris%E2%80%93Pratt_algorithm jump table
+    */
+   private static int[] createJumpTable(byte[] needle) {
+      final int[] jumpTable = new int[needle.length + 1];
+      int j = 0;
+      for (int i = 1; i < needle.length; i++) {
+         while (j > 0 && needle[j] != needle[i]) {
+            j = jumpTable[j];
+         }
+         if (needle[j] == needle[i]) {
+            j++;
+         }
+         jumpTable[i + 1] = j;
+      }
+      for (int i = 1; i < jumpTable.length; i++) {
+         if (jumpTable[i] != 0) {
+            return jumpTable;
+         }
+      }
+      // optimization over the original algorithm: it would save from accessing any jump table
+      return null;
+   }
+
+   /**
+    * https://en.wikipedia.org/wiki/Knuth%E2%80%93Morris%E2%80%93Pratt_algorithm search algorithm
+    */
+   private static int indexOf(ReadableBuffer haystack, int start, int end, byte[] needle, int[] jumpTable) {
+      final long length = end - start;
+      int j = 0;
+      final int needleLength = needle.length;
+      for (int i = 0; i < length; i++) {
+         final int index = start + i;
+         final byte value = haystack.get(index);
+         while (j > 0 && needle[j] != value) {
+            j = jumpTable == null ? 0 : jumpTable[j];
+         }
+         if (needle[j] == value) {
+            j++;
+         }
+         if (j == needleLength) {
+            final int startMatch = index - needleLength + 1;
+            return startMatch;
+         }
+      }
+      return -1;
+   }
+
+   @Override
+   public boolean hasScheduledDeliveryTime() {
+      if (scheduledTime >= 0) {
+         return true;
+      }
+      return anyMessageAnnotations(SCHEDULED_DELIVERY_SYMBOLS,
+                                   SCHEDULED_DELIVERY_ENCODED,
+                                   SCHEDULED_DELIVERY_JUMP_TABLES);
+   }
+
+   private static boolean isValidSymbolEncoding(Symbol[] symbols, byte[][] encodedSymbols) {
+      for (int i = 0, size = symbols.length; i < size; i++) {
+         final Symbol symbol = symbols[i];
+         final byte[] encoded = encodedSymbols[i];
+         if (!Arrays.equals(symbol.toString().getBytes(StandardCharsets.US_ASCII), encoded)) {
+            return false;
+         }
+      }
+      return true;
+   }
+
+   private boolean anyMessageAnnotations(Symbol[] symbols, byte[][] encodedSymbols, int[][] jumpTables) {
+      assert isValidSymbolEncoding(symbols, encodedSymbols);
+      final int count = symbols.length;
+      if (messageDataScanned == SCANNED) {
+         final MessageAnnotations messageAnnotations = this.messageAnnotations;
+         if (messageAnnotations == null) {
+            return false;
+         }
+         Map<Symbol, Object> map = messageAnnotations.getValue();
+         if (map == null) {
+            return false;
+         }
+         for (int i = 0; i < count; i++) {
+            if (map.containsKey(symbols[i])) {
+               return true;
+            }
+         }
+         return false;
+      }
+      // messageDataScanned != SCANNED;
+      final ReadableBuffer data = this.data;
+      DecoderImpl decoder = TLSEncode.getDecoder();
+      final int position = data.position();
+      decoder.setBuffer(data.rewind());
+      try {
+         while (data.hasRemaining()) {
+            TypeConstructor<?> constructor = decoder.readConstructor();
+            if (MessageAnnotations.class.equals(constructor.getTypeClass())) {
 
 Review comment:
   As far as I recall the broker decodes the message annotations on each message specifically to read the routing type and delivery time on receive.  I'm unsure for routing type if that is needed on reload from journal but wouldn't be surprised if it doesn't get called at some point.  

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [activemq-artemis] franz1981 commented on issue #2975: ARTEMIS-2617 Improve AMQP Journal loading

Posted by GitBox <gi...@apache.org>.
franz1981 commented on issue #2975: ARTEMIS-2617 Improve AMQP Journal loading
URL: https://github.com/apache/activemq-artemis/pull/2975#issuecomment-584611326
 
 
   @tabish121 I've tried to improve the fail-fast behaviour for both the search algorithm and the `MessageAnnotations` section search. The latter by using an `IdentityHasMap` to save checking the N possible message sections that would terminate the algorithm: let me know if that makes sense according to the AMQP spec :+1: 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [activemq-artemis] franz1981 edited a comment on issue #2975: ARTEMIS-2617 Improve AMQP Journal loading

Posted by GitBox <gi...@apache.org>.
franz1981 edited a comment on issue #2975: ARTEMIS-2617 Improve AMQP Journal loading
URL: https://github.com/apache/activemq-artemis/pull/2975#issuecomment-583890886
 
 
   In addition, I hope I haven't missed to force message data scanning on getter of other properties other than scheduledDeliveryTime eg routingType.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [activemq-artemis] franz1981 edited a comment on issue #2975: ARTEMIS-2617 Improve AMQP Journal loading

Posted by GitBox <gi...@apache.org>.
franz1981 edited a comment on issue #2975: ARTEMIS-2617 Improve AMQP Journal loading
URL: https://github.com/apache/activemq-artemis/pull/2975#issuecomment-583875806
 
 
   @michaelandrepearce I suppose you are interested in numbers: in my syntethic tests a 2 GB journal with very small messages (to create tons of garbage due to the infrastructure of messages) wasn't able to be loaded by a broker with `4GB` heap size, always leading to a OOM: now it can be loaded without relevant GC pauses eg
   ![image](https://user-images.githubusercontent.com/13125299/74107402-d548ea00-4b6f-11ea-8c00-d62b22082282.png)
   
   We're not quite yet at the same level of Core messages, but very near: I think that the duplicate `address` is responsible of the difference here.
   
   The commit that allows to half-brute force search `x-opt-delivery-time` is what I'm not sure about:
   
   - that method should be optimized and performed optimally by proton itself IMO (nudge nudge @tabish121 @gemmellr), but I would be super happy to provide a PR for it
   - currently it can give rare false positive (causing message scanning when not necessary)
   - I've yet to verify how it behave with many message annotations and longer messages
   
   If @michaelandrepearce has some real journal to test with and compare with `master` it would be great, given that this change can affect positively your usage of the broker ;)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [activemq-artemis] franz1981 edited a comment on issue #2975: ARTEMIS-2617 Improve AMQP Journal loading

Posted by GitBox <gi...@apache.org>.
franz1981 edited a comment on issue #2975: ARTEMIS-2617 Improve AMQP Journal loading
URL: https://github.com/apache/activemq-artemis/pull/2975#issuecomment-583875107
 
 
   @tabish121 @gemmellr @clebertsuconic @michaelandrepearce 
   
   This PR contains many changes and I know there are parts that could be simplified a lot.
   The most important bits that can really be a game-changer for AMQP on journal loading are on https://github.com/apache/activemq-artemis/pull/2975/commits/fc77a546ca1ee047218e938f0413c55ed8c838c6.
   
   So please review especially that part.
   
   An important note: 
   I've found that `AMQPMessagePersisterV2::decode` does something weird re `AMQPMessage::setAddress`: `AMQPMessagePersister::decode` can `AMQPMessage::setAddress`, allocating `AMQPMessage::extraProperties`, but 
   `AMQPMessagePersisterV2::decode`  can decode a new `extraProperties` totally overwriting the existing one. Is it a bug or it's suppose to work like that?
   I see that this behaviour has been introduced on `ARTEMIS-1858`  in 1ae2784dc6075875b18780fa8ba40f86cb895f7b with this comment:
   ```java
      /**
       * This will set the address on CoreMessage.
       *
       * Note for AMQPMessages:
       * in AMQPMessages this will not really change the address on the message. Instead it will add a property
       * on extraProperties which only transverse internally at the broker.
       * Whatever you change here it won't affect anything towards the received message.
       *
       * If you wish to change AMQPMessages address you will have to do it directly at the AMQP Message, however beware
       * that AMQPMessages are not supposed to be changed at the broker, so only do it if you know what you are doing.
       * @param address
       * @return
       */
      Message setAddress(SimpleString address);
   ```
   I've tried to mitigate this by using `CoreMessageObjectPools` to save `SimpleString` allocations [here](https://github.com/franz1981/activemq-artemis/blob/918800b309009fd440bf83b69b54a0d4f49db940/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersisterV2.java#L78) and by allowing `TypedProperties::decode` to append/replace any existing `extraProperty`, saving an `HashMap` allocation, but IMO it could be addressed by properly fixing it, wdyt? any idea how?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [activemq-artemis] franz1981 edited a comment on issue #2975: ARTEMIS-2617 Improve AMQP Journal loading

Posted by GitBox <gi...@apache.org>.
franz1981 edited a comment on issue #2975: ARTEMIS-2617 Improve AMQP Journal loading
URL: https://github.com/apache/activemq-artemis/pull/2975#issuecomment-583875806
 
 
   @michaelandrepearce I suppose you are interested in numbers: in my syntethic tests a 2 GB journal with very small messages (to create tons of garbage due to the infrastructure of messages) wasn't able to be loaded by a broker with `4GB` heap size, always leading to a OOM: now it can load without relevant GC pauses eg
   ![image](https://user-images.githubusercontent.com/13125299/74107402-d548ea00-4b6f-11ea-8c00-d62b22082282.png)
   
   We're not quite yet at the same level of Core messages, but very near: I think that the duplicate `address` is responsible of the difference here.
   
   The commit that allows to half-brute force search `x-opt-delivery-time` is what I'm not sure about:
   
   - that method should be optimized and performed optimally by proton itself IMO (nudge nudge @tabish121 @gemmellr), but I would be super happy to provide a PR for it
   - currently it can give rare false positive (causing message scanning when not necessary)
   - I've yet to verify how it behave with many message annotations and longer messages
   
   If @michaelandrepearce has some real journal to test with and compare with `master` it would be great, given that this change can affect positively your usage of the broker ;)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [activemq-artemis] franz1981 commented on a change in pull request #2975: ARTEMIS-2617 Improve AMQP Journal loading

Posted by GitBox <gi...@apache.org>.
franz1981 commented on a change in pull request #2975: ARTEMIS-2617 Improve AMQP Journal loading
URL: https://github.com/apache/activemq-artemis/pull/2975#discussion_r377297543
 
 

 ##########
 File path: artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java
 ##########
 @@ -156,6 +157,72 @@ public void testCreateMessageForPersistenceDataReload() throws ActiveMQException
       assertEquals(TEST_TO_ADDRESS, message.getAddress());
    }
 
+   @Test
 
 Review comment:
   Probably I have understood what you mean and yes agree, I can improve the coverage with other cases :)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [activemq-artemis] franz1981 commented on a change in pull request #2975: ARTEMIS-2617 Improve AMQP Journal loading

Posted by GitBox <gi...@apache.org>.
franz1981 commented on a change in pull request #2975: ARTEMIS-2617 Improve AMQP Journal loading
URL: https://github.com/apache/activemq-artemis/pull/2975#discussion_r377305670
 
 

 ##########
 File path: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
 ##########
 @@ -1183,8 +1219,126 @@ public Object getCorrelationID() {
       return this;
    }
 
+   /**
+    * https://en.wikipedia.org/wiki/Knuth%E2%80%93Morris%E2%80%93Pratt_algorithm jump table
+    */
+   private static int[] createJumpTable(byte[] needle) {
+      final int[] jumpTable = new int[needle.length + 1];
+      int j = 0;
+      for (int i = 1; i < needle.length; i++) {
+         while (j > 0 && needle[j] != needle[i]) {
+            j = jumpTable[j];
+         }
+         if (needle[j] == needle[i]) {
+            j++;
+         }
+         jumpTable[i + 1] = j;
+      }
+      for (int i = 1; i < jumpTable.length; i++) {
+         if (jumpTable[i] != 0) {
+            return jumpTable;
+         }
+      }
+      // optimization over the original algorithm: it would save from accessing any jump table
+      return null;
+   }
+
+   /**
+    * https://en.wikipedia.org/wiki/Knuth%E2%80%93Morris%E2%80%93Pratt_algorithm search algorithm
+    */
+   private static int indexOf(ReadableBuffer haystack, int start, int end, byte[] needle, int[] jumpTable) {
+      final long length = end - start;
+      int j = 0;
+      final int needleLength = needle.length;
+      for (int i = 0; i < length; i++) {
+         final int index = start + i;
+         final byte value = haystack.get(index);
+         while (j > 0 && needle[j] != value) {
+            j = jumpTable == null ? 0 : jumpTable[j];
+         }
+         if (needle[j] == value) {
+            j++;
+         }
+         if (j == needleLength) {
+            final int startMatch = index - needleLength + 1;
+            return startMatch;
+         }
+      }
+      return -1;
+   }
+
+   @Override
+   public boolean hasScheduledDeliveryTime() {
+      if (scheduledTime >= 0) {
+         return true;
+      }
+      return anyMessageAnnotations(SCHEDULED_DELIVERY_SYMBOLS,
+                                   SCHEDULED_DELIVERY_ENCODED,
+                                   SCHEDULED_DELIVERY_JUMP_TABLES);
+   }
+
+   private static boolean isValidSymbolEncoding(Symbol[] symbols, byte[][] encodedSymbols) {
+      for (int i = 0, size = symbols.length; i < size; i++) {
+         final Symbol symbol = symbols[i];
+         final byte[] encoded = encodedSymbols[i];
+         if (!Arrays.equals(symbol.toString().getBytes(StandardCharsets.US_ASCII), encoded)) {
+            return false;
+         }
+      }
+      return true;
+   }
+
+   private boolean anyMessageAnnotations(Symbol[] symbols, byte[][] encodedSymbols, int[][] jumpTables) {
+      assert isValidSymbolEncoding(symbols, encodedSymbols);
+      final int count = symbols.length;
+      if (messageDataScanned == SCANNED) {
+         final MessageAnnotations messageAnnotations = this.messageAnnotations;
+         if (messageAnnotations == null) {
+            return false;
+         }
+         Map<Symbol, Object> map = messageAnnotations.getValue();
+         if (map == null) {
+            return false;
+         }
+         for (int i = 0; i < count; i++) {
+            if (map.containsKey(symbols[i])) {
+               return true;
+            }
+         }
+         return false;
+      }
+      // messageDataScanned != SCANNED;
+      final ReadableBuffer data = this.data;
+      DecoderImpl decoder = TLSEncode.getDecoder();
+      final int position = data.position();
+      decoder.setBuffer(data.rewind());
+      try {
+         while (data.hasRemaining()) {
+            TypeConstructor<?> constructor = decoder.readConstructor();
+            if (MessageAnnotations.class.equals(constructor.getTypeClass())) {
 
 Review comment:
   I'm looking at 
   ![image](https://user-images.githubusercontent.com/13125299/74188245-5b3b6280-4c4e-11ea-9676-23e589920319.png)
   
   If that's accurate I supposeI can use this structure to stop searching for `MessageAnnotations` earlier: any advices here is wellcome :)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [activemq-artemis] franz1981 edited a comment on issue #2975: ARTEMIS-2617 Improve AMQP Journal loading

Posted by GitBox <gi...@apache.org>.
franz1981 edited a comment on issue #2975: ARTEMIS-2617 Improve AMQP Journal loading
URL: https://github.com/apache/activemq-artemis/pull/2975#issuecomment-584611326
 
 
   @tabish121 I've tried to improve the fail-fast behaviour for both the `Symbol` search algorithm and the `MessageAnnotations` section search. The latter by using an `IdentityHasMap` to save checking the N possible message sections that would terminate the algorithm: let me know if that makes sense according to the AMQP spec :+1: 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [activemq-artemis] asfgit closed pull request #2975: ARTEMIS-2617 Improve AMQP Journal loading

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #2975: ARTEMIS-2617 Improve AMQP Journal loading
URL: https://github.com/apache/activemq-artemis/pull/2975
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [activemq-artemis] franz1981 commented on issue #2975: ARTEMIS-2617 Improve AMQP Journal loading

Posted by GitBox <gi...@apache.org>.
franz1981 commented on issue #2975: ARTEMIS-2617 Improve AMQP Journal loading
URL: https://github.com/apache/activemq-artemis/pull/2975#issuecomment-583875806
 
 
   @michaelandrepearce I suppose you are interested in numbers: in my syntethic tests a 2 GB journal with very small messages has wasn't able to be loaded by a broker with `4GB` heap size, always leading to a OOM: now it can load without relevant GC pauses eg
   ![image](https://user-images.githubusercontent.com/13125299/74107402-d548ea00-4b6f-11ea-8c00-d62b22082282.png)
   
   We're not quite yet at the same level of Core messages, but very near: I think that the duplicate `address` is responsible of the difference here.
   
   The commit that allows to half-brute force search `x-opt-delivery-time` is what I'm not sure about:
   
   - that method should be optimized and performed optimally by proton itself IMO (nudge nudge @tabish121 @gemmellr), but I would be super happy to provide a PR for it
   - currently it can give rare false positive (causing message scanning when not necessary)
   - I've yet to verify how it behave with many message annotations and longer messages
   
   If @michaelandrepearce has some real journal to test with and compare with `master` it would be great, given that this change can affect positively your usage of the broker ;)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [activemq-artemis] clebertsuconic commented on issue #2975: ARTEMIS-2617 Improve AMQP Journal loading

Posted by GitBox <gi...@apache.org>.
clebertsuconic commented on issue #2975: ARTEMIS-2617 Improve AMQP Journal loading
URL: https://github.com/apache/activemq-artemis/pull/2975#issuecomment-585354134
 
 
   I need this merged, as this will impact the changes I'm making for large messages.
   
   I see that you still need other work done, such as adding more tests.. on which case can you please add any further changes into a new PR?
   
   thanks a lot.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [activemq-artemis] franz1981 commented on a change in pull request #2975: ARTEMIS-2617 Improve AMQP Journal loading

Posted by GitBox <gi...@apache.org>.
franz1981 commented on a change in pull request #2975: ARTEMIS-2617 Improve AMQP Journal loading
URL: https://github.com/apache/activemq-artemis/pull/2975#discussion_r377296464
 
 

 ##########
 File path: artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java
 ##########
 @@ -156,6 +157,72 @@ public void testCreateMessageForPersistenceDataReload() throws ActiveMQException
       assertEquals(TEST_TO_ADDRESS, message.getAddress());
    }
 
+   @Test
 
 Review comment:
   In this case it would lead to a false positive, that's ok in our case, given that is very unlikely that both values or keys are named similarly to a specific existing symbol. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services