You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/08/26 02:45:41 UTC

[2/2] activemq-artemis git commit: ARTEMIS-1374 Improve performance and GC overhead of AMQP transfer tags

ARTEMIS-1374 Improve performance and GC overhead of AMQP transfer tags

Use a more efficient means of creating AMQP transfer tags and pool
previously generated values for reuse on future sends.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/968268ee
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/968268ee
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/968268ee

Branch: refs/heads/master
Commit: 968268ee5d1964be860b4b7d7799072a0822476e
Parents: 8c87017
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Aug 25 18:25:04 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Aug 25 22:45:33 2017 -0400

----------------------------------------------------------------------
 .../amqp/proton/AMQPSessionContext.java         |   8 +-
 .../amqp/proton/AmqpTransferTagGenerator.java   | 126 +++++++++++++
 .../amqp/proton/ProtonServerSenderContext.java  |   2 +-
 .../proton/AmqpTransferTagGeneratorTest.java    | 186 +++++++++++++++++++
 4 files changed, 317 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/968268ee/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
index bf60e17..5cd3515 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
@@ -45,14 +45,14 @@ public class AMQPSessionContext extends ProtonInitializable {
 
    protected final Session session;
 
-   private long currentTag = 0;
-
    protected Map<Receiver, ProtonServerReceiverContext> receivers = new ConcurrentHashMap<>();
 
    protected Map<Sender, ProtonServerSenderContext> senders = new ConcurrentHashMap<>();
 
    protected boolean closed = false;
 
+   protected final AmqpTransferTagGenerator tagCache = new AmqpTransferTagGenerator();
+
    public AMQPSessionContext(AMQPSessionCallback sessionSPI, AMQPConnectionContext connection, Session session) {
       this.connection = connection;
       this.sessionSPI = sessionSPI;
@@ -93,11 +93,11 @@ public class AMQPSessionContext extends ProtonInitializable {
    }
 
    public byte[] getTag() {
-      return Long.toHexString(currentTag++).getBytes();
+      return tagCache.getNextTag();
    }
 
    public void replaceTag(byte[] tag) {
-      // TODO: do we need to reuse this?
+      tagCache.returnTag(tag);
    }
 
    public void close() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/968268ee/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpTransferTagGenerator.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpTransferTagGenerator.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpTransferTagGenerator.java
new file mode 100644
index 0000000..2cbf9ed
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpTransferTagGenerator.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.proton;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+
+/**
+ * Utility class that can generate and if enabled pool the binary tag values
+ * used to identify transfers over an AMQP link.
+ */
+public final class AmqpTransferTagGenerator {
+
+   public static final int DEFAULT_TAG_POOL_SIZE = 1024;
+
+   private final Deque<byte[]> tagPool;
+
+   private long nextTagId;
+   private int maxPoolSize = DEFAULT_TAG_POOL_SIZE;
+
+   public AmqpTransferTagGenerator() {
+      this(true);
+   }
+
+   public AmqpTransferTagGenerator(boolean pool) {
+      if (pool) {
+         this.tagPool = new ArrayDeque<byte[]>();
+      } else {
+         this.tagPool = null;
+      }
+   }
+
+   /**
+    * Retrieves the next available tag.
+    *
+    * @return a new or unused tag depending on the pool option.
+    */
+   public synchronized byte[] getNextTag() {
+      byte[] tagBytes = null;
+
+      if (tagPool != null) {
+         tagBytes = tagPool.pollFirst();
+      }
+
+      if (tagBytes == null) {
+         long tag = nextTagId++;
+         int size = encodingSize(tag);
+
+         tagBytes = new byte[size];
+
+         for (int i = 0; i < size; ++i) {
+            tagBytes[size - 1 - i] = (byte) (tag >>> (i * 8));
+         }
+      }
+
+      return tagBytes;
+   }
+
+   /**
+    * When used as a pooled cache of tags the unused tags should always be
+    * returned once the transfer has been settled.
+    *
+    * @param data
+    *        a previously borrowed tag that is no longer in use.
+    */
+   public synchronized void returnTag(byte[] data) {
+      if (tagPool != null && tagPool.size() < maxPoolSize) {
+         tagPool.offerLast(data);
+      }
+   }
+
+   /**
+    * Gets the current max pool size value.
+    *
+    * @return the current max tag pool size.
+    */
+   public int getMaxPoolSize() {
+      return maxPoolSize;
+   }
+
+   /**
+    * Sets the max tag pool size. If the size is smaller than the current number
+    * of pooled tags the pool will drain over time until it matches the max.
+    *
+    * @param maxPoolSize
+    *        the maximum number of tags to hold in the pool.
+    */
+   public void setMaxPoolSize(int maxPoolSize) {
+      this.maxPoolSize = maxPoolSize;
+   }
+
+   /**
+    * @return true if the generator is using a pool of tags to reduce
+    *         allocations.
+    */
+   public boolean isPooling() {
+      return tagPool != null;
+   }
+
+   private int encodingSize(long value) {
+      if (value < 0) {
+         return Long.BYTES;
+      }
+
+      int size = 1;
+      while (size < 8 && (value >= (1L << (size * 8)))) {
+         size++;
+      }
+
+      return size;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/968268ee/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 868e9c8..c774b4d 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -618,7 +618,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
                   throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
                }
             }
-            // todo add tag caching
+
             if (!preSettle) {
                protonSession.replaceTag(delivery.getTag());
             }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/968268ee/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpTransferTagGeneratorTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpTransferTagGeneratorTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpTransferTagGeneratorTest.java
new file mode 100644
index 0000000..0ed1ffb
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpTransferTagGeneratorTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.proton;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * Tests for the AMQP Transfer Tag Generator
+ */
+public class AmqpTransferTagGeneratorTest {
+
+   @Test
+   public void testCreate() {
+      AmqpTransferTagGenerator tagGen = new AmqpTransferTagGenerator();
+      assertTrue(tagGen.isPooling());
+      assertEquals(AmqpTransferTagGenerator.DEFAULT_TAG_POOL_SIZE, tagGen.getMaxPoolSize());
+   }
+
+   @Test
+   public void testCreateDisabled() {
+      AmqpTransferTagGenerator tagGen = new AmqpTransferTagGenerator(false);
+      assertFalse(tagGen.isPooling());
+      assertEquals(AmqpTransferTagGenerator.DEFAULT_TAG_POOL_SIZE, tagGen.getMaxPoolSize());
+   }
+
+   @Test
+   public void testNewTagsOnSuccessiveCheckouts() {
+      AmqpTransferTagGenerator tagGen = new AmqpTransferTagGenerator(true);
+
+      byte[] tag1 = tagGen.getNextTag();
+      byte[] tag2 = tagGen.getNextTag();
+      byte[] tag3 = tagGen.getNextTag();
+
+      assertNotSame(tag1, tag2);
+      assertNotSame(tag1, tag3);
+      assertNotSame(tag3, tag2);
+
+      assertFalse(Arrays.equals(tag1, tag2));
+      assertFalse(Arrays.equals(tag1, tag3));
+      assertFalse(Arrays.equals(tag3, tag2));
+   }
+
+   @Test
+   public void testTagPoolingInEffect() {
+      AmqpTransferTagGenerator tagGen = new AmqpTransferTagGenerator(true);
+
+      byte[] tag1 = tagGen.getNextTag();
+      byte[] tag2 = tagGen.getNextTag();
+      tagGen.returnTag(tag1);
+      tagGen.returnTag(tag2);
+      byte[] tag3 = tagGen.getNextTag();
+      byte[] tag4 = tagGen.getNextTag();
+
+      assertSame(tag1, tag3);
+      assertSame(tag2, tag4);
+      assertNotSame(tag1, tag4);
+      assertNotSame(tag2, tag3);
+   }
+
+   @Test
+   public void testPooledTagsReturnedInCheckedInOrder() {
+      AmqpTransferTagGenerator tagGen = new AmqpTransferTagGenerator(true);
+
+      byte[] tag1 = tagGen.getNextTag();
+      byte[] tag2 = tagGen.getNextTag();
+      tagGen.returnTag(tag2);
+      tagGen.returnTag(tag1);
+      byte[] tag3 = tagGen.getNextTag();
+      byte[] tag4 = tagGen.getNextTag();
+
+      assertSame(tag1, tag4);
+      assertSame(tag2, tag3);
+      assertNotSame(tag1, tag3);
+      assertNotSame(tag2, tag4);
+   }
+
+   @Test
+   public void testTagArrayGrowsWithTagValue() {
+      AmqpTransferTagGenerator tagGen = new AmqpTransferTagGenerator(false);
+
+      for (int i = 0; i < 512; ++i) {
+         byte[] tag = tagGen.getNextTag();
+
+         if (i < 256) {
+            assertEquals(1, tag.length);
+         } else {
+            assertEquals(2, tag.length);
+         }
+      }
+   }
+
+   @Test
+   public void testTagValueMatchesParsedArray() throws IOException {
+      AmqpTransferTagGenerator tagGen = new AmqpTransferTagGenerator(false);
+
+      for (int i = 0; i < Short.MAX_VALUE; ++i) {
+         byte[] tag = tagGen.getNextTag();
+
+         ByteArrayInputStream bais = new ByteArrayInputStream(tag);
+         DataInputStream dis = new DataInputStream(bais);
+
+         if (i < 256) {
+            assertEquals(1, tag.length);
+            assertEquals((byte) i, dis.readByte());
+         } else {
+            assertEquals(2, tag.length);
+            assertEquals(i, dis.readShort());
+         }
+      }
+   }
+
+   @Test
+   public void testTagGenerationWorksWithIdRollover() throws Exception {
+      AmqpTransferTagGenerator tagGen = new AmqpTransferTagGenerator(false);
+
+      Field urisField = tagGen.getClass().getDeclaredField("nextTagId");
+      urisField.setAccessible(true);
+      urisField.set(tagGen, Long.MAX_VALUE + 1);
+
+      {
+         byte[] tag = tagGen.getNextTag();
+
+         ByteArrayInputStream bais = new ByteArrayInputStream(tag);
+         DataInputStream dis = new DataInputStream(bais);
+
+         assertEquals(8, tag.length);
+         assertEquals(Long.MAX_VALUE + 1, dis.readLong());
+      }
+      {
+         byte[] tag = tagGen.getNextTag();
+
+         ByteArrayInputStream bais = new ByteArrayInputStream(tag);
+         DataInputStream dis = new DataInputStream(bais);
+
+         assertEquals(8, tag.length);
+         assertEquals(Long.MAX_VALUE + 2, dis.readLong());
+      }
+   }
+
+   @Ignore("Used to test performance")
+   @Test
+   public void testTagGeneratorOverTime() {
+      final AmqpTransferTagGenerator tagGen = new AmqpTransferTagGenerator(true);
+      final int tagLoop = AmqpTransferTagGenerator.DEFAULT_TAG_POOL_SIZE;
+      final ArrayList<byte[]> tags = new ArrayList<>(tagLoop);
+
+      for (int i = 0; i < Short.MAX_VALUE * 16; ++i) {
+         // Checkout all the tags the pool will create
+         for (int j = 0; j < tagLoop; ++j) {
+            tags.add(tagGen.getNextTag());
+         }
+
+         // Return them and then clear
+         tags.forEach((tag) -> tagGen.returnTag(tag));
+         tags.clear();
+      }
+   }
+}