You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ni...@apache.org on 2020/03/17 10:20:00 UTC

[activemq-artemis] branch master updated: ARTEMIS-2662 Using previously stored encodeSize on page record offset

This is an automated email from the ASF dual-hosted git repository.

nigrofranz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new c801c00  ARTEMIS-2662 Using previously stored encodeSize on page record offset
     new 10ed009  This closes #3021
c801c00 is described below

commit c801c00e335272e71c8b659efdf4067da644e637
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Mon Mar 16 21:29:55 2020 -0400

    ARTEMIS-2662 Using previously stored encodeSize on page record offset
    
    There is no guarantee that the encodeSize size is the same in AMQP right after read.
    As the protocol may add additional bytes right after decoded such as header, extra properties.. etc.
---
 .../activemq/artemis/core/paging/PagedMessage.java |  7 ++
 .../paging/cursor/impl/PageSubscriptionImpl.java   |  2 +-
 .../activemq/artemis/core/paging/impl/Page.java    |  4 +-
 .../artemis/core/paging/impl/PagedMessageImpl.java | 16 ++++-
 .../wireformat/ReplicationPageWriteMessage.java    |  2 +-
 .../core/paging/cursor/impl/PageReaderTest.java    |  9 ++-
 .../tests/integration/amqp/AmqpTestSupport.java    | 17 +++++
 .../amqp/paging/AmqpPageReaderTest.java            | 83 ++++++++++++++++++++++
 .../integration/amqp/paging/AmqpPagingTest.java    | 80 +++++++++++++++++++++
 9 files changed, 212 insertions(+), 8 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java
index 5b39691..c2344ff 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java
@@ -48,4 +48,11 @@ public interface PagedMessage extends EncodingSupport {
     * @throws ActiveMQException
     */
    long getPersistentSize() throws ActiveMQException;
+
+   /** This returns how much the PagedMessage used, or it's going to use
+    *  from storage.
+    *  We can't calculate the encodeSize as some persisters don't guarantee to re-store the data
+    *  at the same amount of bytes it used. In some cases it may need to add headers in AMQP
+    *  or extra data that may affect the outcome of getEncodeSize() */
+   int getStoredSize();
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
index 423b588..49b7b45 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
@@ -1357,7 +1357,7 @@ public final class PageSubscriptionImpl implements PageSubscription {
                   break;
                }
 
-               int nextFileOffset = message.getPosition().getFileOffset() == -1 ? -1 : message.getPosition().getFileOffset() + message.getPagedMessage().getEncodeSize() + Page.SIZE_RECORD;
+               int nextFileOffset = message.getPosition().getFileOffset() == -1 ? -1 : message.getPosition().getFileOffset() + message.getPagedMessage().getStoredSize() + Page.SIZE_RECORD;
                tmpPosition = new PagePositionAndFileOffset(nextFileOffset, message.getPosition());
 
                boolean valid = true;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
index bbaea46..bdaf99e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
@@ -198,7 +198,7 @@ public final class Page implements Comparable<Page> {
                   final int endPosition = readFileBuffer.position() + encodedSize;
                   //this check must be performed upfront decoding
                   if (readFileBuffer.remaining() >= (encodedSize + 1) && readFileBuffer.get(endPosition) == Page.END_BYTE) {
-                     final PagedMessageImpl msg = new PagedMessageImpl(storageManager);
+                     final PagedMessageImpl msg = new PagedMessageImpl(encodedSize, storageManager);
                      readFileBufferWrapper.setIndex(readFileBuffer.position(), endPosition);
                      msg.decode(readFileBufferWrapper);
                      readFileBuffer.position(endPosition + 1);
@@ -363,7 +363,7 @@ public final class Page implements Comparable<Page> {
                      final int endPosition = fileBuffer.position() + encodedSize;
                      //this check must be performed upfront decoding
                      if (fileBuffer.remaining() >= (encodedSize + 1) && fileBuffer.get(endPosition) == Page.END_BYTE) {
-                        final PagedMessageImpl msg = new PagedMessageImpl(storageManager);
+                        final PagedMessageImpl msg = new PagedMessageImpl(encodedSize, storageManager);
                         fileBufferWrapper.setIndex(fileBuffer.position(), endPosition);
                         msg.decode(fileBufferWrapper);
                         fileBuffer.position(endPosition + 1);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java
index 11eb202..67d902f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java
@@ -47,6 +47,8 @@ public class PagedMessageImpl implements PagedMessage {
 
    private long transactionID = 0;
 
+   private final int storedSize;
+
    private volatile StorageManager storageManager;
 
    public PagedMessageImpl(final Message message, final long[] queueIDs, final long transactionID) {
@@ -57,10 +59,22 @@ public class PagedMessageImpl implements PagedMessage {
    public PagedMessageImpl(final Message message, final long[] queueIDs) {
       this.queueIDs = queueIDs;
       this.message = message;
+      this.storedSize = 0;
    }
 
-   public PagedMessageImpl(StorageManager storageManager) {
+   public PagedMessageImpl(int storedSize, StorageManager storageManager) {
       this.storageManager = storageManager;
+      this.storedSize = storedSize;
+   }
+
+
+   @Override
+   public int getStoredSize() {
+      if (storedSize <= 0) {
+         return getEncodeSize();
+      } else {
+         return storedSize;
+      }
    }
 
    @Override
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java
index 4f5079d..7aa9f8a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java
@@ -56,7 +56,7 @@ public class ReplicationPageWriteMessage extends PacketImpl {
    @Override
    public void decodeRest(final ActiveMQBuffer buffer) {
       pageNumber = buffer.readInt();
-      pagedMessage = new PagedMessageImpl(null);
+      pagedMessage = new PagedMessageImpl(0, null);
       pagedMessage.decode(buffer);
    }
 
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReaderTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReaderTest.java
index f347714..93cf5bd 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReaderTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReaderTest.java
@@ -58,7 +58,7 @@ public class PageReaderTest extends ActiveMQTestBase {
             PagePosition pagePosition = new PagePositionImpl(10, i);
             pagedMessage = pageReader.getMessage(pagePosition);
          } else {
-            int nextFileOffset = pagedMessage == null ? -1 : offsets[i - 1] + pagedMessage.getEncodeSize() + Page.SIZE_RECORD;
+            int nextFileOffset = pagedMessage == null ? -1 : offsets[i - 1] + pagedMessage.getStoredSize() + Page.SIZE_RECORD;
             PagePositionAndFileOffset startPosition = new PagePositionAndFileOffset(nextFileOffset, new PagePositionImpl(10, i - 1));
             PagePosition pagePosition = startPosition.nextPagePostion();
             assertEquals("Message " + i + " has wrong offset", offsets[i], pagePosition.getFileOffset());
@@ -85,7 +85,7 @@ public class PageReaderTest extends ActiveMQTestBase {
       PagePosition pagePosition = new PagePositionImpl(10, 0);
       PagedMessage firstPagedMessage = pageReader.getMessage(pagePosition);
       assertEquals("Message 0 has a wrong encodeSize", pagedMessages[0].getEncodeSize(), firstPagedMessage.getEncodeSize());
-      int nextFileOffset = offsets[0] + firstPagedMessage.getEncodeSize() + Page.SIZE_RECORD;
+      int nextFileOffset = offsets[0] + firstPagedMessage.getStoredSize() + Page.SIZE_RECORD;
       PagePositionAndFileOffset startPosition = new PagePositionAndFileOffset(nextFileOffset, new PagePositionImpl(10, 0));
       PagePosition nextPagePosition = startPosition.nextPagePostion();
       assertEquals("Message 1 has a wrong offset", offsets[1], nextPagePosition.getFileOffset());
@@ -147,7 +147,10 @@ public class PageReaderTest extends ActiveMQTestBase {
       for (int i = 0; i < num; i++) {
          Message msg = createMessage(simpleDestination, i, content);
          offsets[i] = (int)page.getFile().position();
-         page.write(new PagedMessageImpl(msg, new long[0]));
+         PagedMessageImpl pgdMessage = new PagedMessageImpl(msg, new long[0]);
+         long expectedPosition = pgdMessage.getEncodeSize() + Page.SIZE_RECORD + page.getFile().position();
+         page.write(pgdMessage);
+         Assert.assertEquals(page.getFile().position(), expectedPosition);
 
          Assert.assertEquals(i + 1, page.getNumberOfMessages());
       }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java
index 15873a6..3532caa 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java
@@ -19,9 +19,14 @@ package org.apache.activemq.artemis.tests.integration.amqp;
 import java.net.URI;
 import java.util.LinkedList;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPStandardMessage;
+import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
 import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.qpid.proton.message.impl.MessageImpl;
 import org.junit.After;
 
 /**
@@ -131,4 +136,16 @@ public class AmqpTestSupport extends ActiveMQTestBase {
    public AmqpClient createAmqpClient(URI brokerURI, String username, String password) throws Exception {
       return new AmqpClient(brokerURI, username, password);
    }
+
+   public static AMQPStandardMessage encodeAndDecodeMessage(int messageFormat, MessageImpl message, int expectedSize) {
+      ByteBuf nettyBuffer = Unpooled.buffer(expectedSize);
+
+      message.encode(new NettyWritable(nettyBuffer));
+      byte[] bytes = new byte[nettyBuffer.writerIndex()];
+      nettyBuffer.readBytes(bytes);
+
+      return new AMQPStandardMessage(messageFormat, bytes, null);
+   }
+
+
 }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpPageReaderTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpPageReaderTest.java
new file mode 100644
index 0000000..2afb745
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpPageReaderTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.amqp.paging;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.paging.cursor.impl.PageReaderTest;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPStandardMessage;
+import org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
+import org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.qpid.proton.message.impl.MessageImpl;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class AmqpPageReaderTest extends PageReaderTest {
+
+   public MessageImpl createProtonMessage(String address) {
+      AmqpMessage message = new AmqpMessage();
+      final StringBuilder builder = new StringBuilder();
+      for (int i = 0; i < 1000; i++) {
+         builder.append('0');
+      }
+      final String data = builder.toString();
+      message.setText(data);
+      message.setAddress(address);
+      message.setDurable(true);
+
+      MessageImpl protonMessage = (MessageImpl) message.getWrappedMessage();
+
+      return protonMessage;
+   }
+
+   @Override
+   protected Message createMessage(SimpleString address, int msgId, byte[] content) {
+      MessageImpl protonMessage = createProtonMessage(address.toString());
+      AMQPStandardMessage amqpStandardMessage =  AmqpTestSupport.encodeAndDecodeMessage(0, protonMessage, 2 * 1024);
+      amqpStandardMessage.setMessageID(msgId);
+
+      return amqpStandardMessage;
+   }
+
+
+   @Test
+   public void testEncodeSize() throws Exception {
+
+      Message message = createMessage(SimpleString.toSimpleString("Test"), 1, new byte[10]);
+
+      MessagePersister persister = (MessagePersister)message.getPersister();
+
+      ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(1024);
+      persister.encode(buffer, message);
+
+      Assert.assertEquals(persister.getEncodeSize(message), buffer.writerIndex());
+
+      // the very first byte is the persisterID, we skip that since we are calling the Persister directly
+      buffer.readerIndex(1);
+      Message messageRead = persister.decode(buffer, null, null);
+
+      // The current persister does not guarantee the same encode size after loading
+      /// if this ever changes we can uncomment the next line.
+      // Assert.assertEquals(persister.getEncodeSize(message), persister.getEncodeSize(messageRead));
+
+
+   }
+}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpPagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpPagingTest.java
new file mode 100644
index 0000000..2ed9845
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpPagingTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.paging;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class AmqpPagingTest extends AmqpClientTestSupport {
+
+   @Override
+   protected void addConfiguration(ActiveMQServer server) {
+      super.addConfiguration(server);
+      final Map<String, AddressSettings> addressesSettings = server.getConfiguration().getAddressesSettings();
+      addressesSettings.get("#").setMaxSizeBytes(100000).setPageSizeBytes(10000);
+   }
+
+   @Test(timeout = 60000)
+   public void testPaging() throws Exception {
+      final int MSG_SIZE = 1000;
+      final StringBuilder builder = new StringBuilder();
+      for (int i = 0; i < MSG_SIZE; i++) {
+         builder.append('0');
+      }
+      final String data = builder.toString();
+      final int MSG_COUNT = 1_000;
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpSender sender = session.createSender(getQueueName(), true);
+
+      AmqpReceiver receiver = session.createReceiver(getQueueName());
+      receiver.setPresettle(true);
+      receiver.flow(10);
+      Assert.assertNull("somehow the queue had messages from a previous test", receiver.receiveNoWait());
+      receiver.flow(0);
+      for (int i = 0; i < MSG_COUNT; i++) {
+         AmqpMessage message = new AmqpMessage();
+         message.setText(data);
+         sender.send(message);
+      }
+      sender.close();
+      receiver.flow(MSG_COUNT);
+      for (int i = 0; i < MSG_COUNT; i++) {
+         AmqpMessage receive = receiver.receive(10, TimeUnit.SECONDS);
+         assertNotNull("Not received anything after " + i + " receive", receive);
+         receive.accept();
+      }
+      receiver.close();
+      connection.close();
+   }
+
+}