You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/03/01 14:54:11 UTC

[1/2] activemq-artemis git commit: ARTEMIS-1705 Only messages from MessageReferences are subtracted from the queueMemorySize

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 66dbc9e3b -> 838859f59


ARTEMIS-1705 Only messages from MessageReferences are subtracted from the queueMemorySize


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c808f246
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c808f246
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c808f246

Branch: refs/heads/master
Commit: c808f246e5889b05ce37069a55f013c4f8a2ce06
Parents: 66dbc9e
Author: 17103355 <17...@cnsuning.com>
Authored: Wed Feb 28 11:38:00 2018 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Mar 1 09:53:58 2018 -0500

----------------------------------------------------------------------
 .../artemis/core/server/impl/QueueImpl.java     | 17 ++++-
 .../management/QueueControlTest.java            | 68 ++++++++++++++++++--
 2 files changed, 78 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c808f246/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 98f728d..58f31ea 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -1484,9 +1484,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       return iterQueue(flushLimit, filter1, new QueueIterateAction() {
          @Override
          public void actMessage(Transaction tx, MessageReference ref) throws Exception {
+            actMessage(tx, ref, true);
+         }
+
+         @Override
+         public void actMessage(Transaction tx, MessageReference ref, boolean fromMessageReferences) throws Exception {
             incDelivering(ref);
             acknowledge(tx, ref, ackReason);
-            refRemoved(ref);
+            if (fromMessageReferences) {
+               refRemoved(ref);
+            }
          }
       });
    }
@@ -1558,7 +1565,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                if (filter1 == null || filter1.match(reference.getMessage())) {
                   count++;
                   txCount++;
-                  messageAction.actMessage(tx, reference);
+                  messageAction.actMessage(tx, reference, false);
                } else {
                   addTail(reference, false);
                }
@@ -2764,7 +2771,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
             ref.acknowledge(tx, AckReason.KILLED);
          } else {
             ActiveMQServerLogger.LOGGER.messageExceededMaxDeliverySendtoDLA(ref, deadLetterAddress, name);
-            move(tx, deadLetterAddress,null,  ref, false, AckReason.KILLED);
+            move(tx, deadLetterAddress, null, ref, false, AckReason.KILLED);
          }
       } else {
          ActiveMQServerLogger.LOGGER.messageExceededMaxDeliveryNoDLA(ref, name);
@@ -3191,6 +3198,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    abstract class QueueIterateAction {
 
       public abstract void actMessage(Transaction tx, MessageReference ref) throws Exception;
+
+      public void actMessage(Transaction tx, MessageReference ref, boolean fromMessageReferences) throws Exception {
+         actMessage(tx, ref);
+      }
    }
 
    /* For external use we need to use a synchronized version since the list is not thread safe */

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c808f246/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
index 9478690..8bd5d03 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.integration.management;
 import static org.apache.activemq.artemis.core.management.impl.openmbean.CompositeDataConstants.BODY;
 
 import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
@@ -36,6 +37,7 @@ import javax.json.JsonObject;
 import javax.management.Notification;
 import javax.management.openmbean.CompositeData;
 
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.JsonUtil;
 import org.apache.activemq.artemis.api.core.Message;
@@ -57,6 +59,7 @@ import org.apache.activemq.artemis.api.core.management.QueueControl;
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.config.DivertConfiguration;
 import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl;
+import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
 import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServers;
@@ -946,8 +949,8 @@ public class QueueControlTest extends ManagementTestBase {
       session.createAddress(myTopic, RoutingType.MULTICAST, false);
 
       DivertConfiguration divert = new DivertConfiguration().setName("local-divert")
-                                                            .setRoutingName("some-name").setAddress(myTopic.toString())
-                                                            .setForwardingAddress(forwardingAddress.toString()).setExclusive(false);
+            .setRoutingName("some-name").setAddress(myTopic.toString())
+            .setForwardingAddress(forwardingAddress.toString()).setExclusive(false);
       server.deployDivert(divert);
 
       // Send message to topic.
@@ -1506,6 +1509,63 @@ public class QueueControlTest extends ManagementTestBase {
    }
 
    @Test
+   public void testRemoveAllWithPagingMode() throws Exception {
+
+      final int MESSAGE_SIZE = 1024 * 3; // 3k
+
+      // reset maxSize for Paging mode
+      Field maxSizField = PagingManagerImpl.class.getDeclaredField("maxSize");
+      maxSizField.setAccessible(true);
+      maxSizField.setLong(server.getPagingManager(), 10240);
+      clearDataRecreateServerDirs();
+
+      SimpleString address = RandomUtil.randomSimpleString();
+      SimpleString queueName = RandomUtil.randomSimpleString();
+
+      session.createQueue(address, RoutingType.MULTICAST, queueName, null, durable);
+
+      Queue queue = server.locateQueue(queueName);
+      Assert.assertEquals(false, queue.getPageSubscription().isPaging());
+
+      ClientProducer producer = session.createProducer(address);
+
+      byte[] body = new byte[MESSAGE_SIZE];
+
+      ByteBuffer bb = ByteBuffer.wrap(body);
+
+      for (int j = 1; j <= MESSAGE_SIZE; j++) {
+         bb.put(getSamplebyte(j));
+      }
+
+      final int numberOfMessages = 8000;
+      ClientMessage message;
+      for (int i = 0; i < numberOfMessages; i++) {
+         message = session.createMessage(true);
+
+         ActiveMQBuffer bodyLocal = message.getBodyBuffer();
+
+         bodyLocal.writeBytes(body);
+
+         producer.send(message);
+      }
+
+      Assert.assertEquals(true, queue.getPageSubscription().isPaging());
+
+      QueueControl queueControl = createManagementControl(address, queueName);
+      assertMessageMetrics(queueControl, numberOfMessages, durable);
+      int removedMatchedMessagesCount = queueControl.removeAllMessages();
+      Assert.assertEquals(numberOfMessages, removedMatchedMessagesCount);
+      assertMessageMetrics(queueControl, 0, durable);
+
+      Field queueMemoprySizeField = QueueImpl.class.getDeclaredField("queueMemorySize");
+      queueMemoprySizeField.setAccessible(true);
+      AtomicInteger queueMemorySize = (AtomicInteger) queueMemoprySizeField.get(queue);
+      Assert.assertEquals(0, queueMemorySize.get());
+
+      session.deleteQueue(queueName);
+   }
+
+   @Test
    public void testRemoveMessagesWithEmptyFilter() throws Exception {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
@@ -2491,8 +2551,8 @@ public class QueueControlTest extends ManagementTestBase {
    }
 
    protected void assertMetrics(final QueueControl queueControl, long messageCount, boolean durable,
-         Supplier<Number> count, Supplier<Number> size,
-         Supplier<Number>durableCount, Supplier<Number> durableSize) throws Exception {
+                                Supplier<Number> count, Supplier<Number> size,
+                                Supplier<Number> durableCount, Supplier<Number> durableSize) throws Exception {
 
       //make sure count stat equals message count
       Assert.assertTrue(Wait.waitFor(() -> count.get().longValue() == messageCount, 3, 100));


[2/2] activemq-artemis git commit: This closes #1907

Posted by cl...@apache.org.
This closes #1907


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/838859f5
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/838859f5
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/838859f5

Branch: refs/heads/master
Commit: 838859f59a27f7e8503510fe0547485c216fa5e4
Parents: 66dbc9e c808f24
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Mar 1 09:53:59 2018 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Mar 1 09:53:59 2018 -0500

----------------------------------------------------------------------
 .../artemis/core/server/impl/QueueImpl.java     | 17 ++++-
 .../management/QueueControlTest.java            | 68 ++++++++++++++++++--
 2 files changed, 78 insertions(+), 7 deletions(-)
----------------------------------------------------------------------