You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/03/01 14:54:11 UTC
[1/2] activemq-artemis git commit: ARTEMIS-1705 Only messages from
MessageReferences are subtracted from the queueMemorySize
Repository: activemq-artemis
Updated Branches:
refs/heads/master 66dbc9e3b -> 838859f59
ARTEMIS-1705 Only messages from MessageReferences are subtracted from the queueMemorySize
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c808f246
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c808f246
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c808f246
Branch: refs/heads/master
Commit: c808f246e5889b05ce37069a55f013c4f8a2ce06
Parents: 66dbc9e
Author: 17103355 <17...@cnsuning.com>
Authored: Wed Feb 28 11:38:00 2018 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Mar 1 09:53:58 2018 -0500
----------------------------------------------------------------------
.../artemis/core/server/impl/QueueImpl.java | 17 ++++-
.../management/QueueControlTest.java | 68 ++++++++++++++++++--
2 files changed, 78 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c808f246/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 98f728d..58f31ea 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -1484,9 +1484,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return iterQueue(flushLimit, filter1, new QueueIterateAction() {
@Override
public void actMessage(Transaction tx, MessageReference ref) throws Exception {
+ actMessage(tx, ref, true);
+ }
+
+ @Override
+ public void actMessage(Transaction tx, MessageReference ref, boolean fromMessageReferences) throws Exception {
incDelivering(ref);
acknowledge(tx, ref, ackReason);
- refRemoved(ref);
+ if (fromMessageReferences) {
+ refRemoved(ref);
+ }
}
});
}
@@ -1558,7 +1565,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (filter1 == null || filter1.match(reference.getMessage())) {
count++;
txCount++;
- messageAction.actMessage(tx, reference);
+ messageAction.actMessage(tx, reference, false);
} else {
addTail(reference, false);
}
@@ -2764,7 +2771,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
ref.acknowledge(tx, AckReason.KILLED);
} else {
ActiveMQServerLogger.LOGGER.messageExceededMaxDeliverySendtoDLA(ref, deadLetterAddress, name);
- move(tx, deadLetterAddress,null, ref, false, AckReason.KILLED);
+ move(tx, deadLetterAddress, null, ref, false, AckReason.KILLED);
}
} else {
ActiveMQServerLogger.LOGGER.messageExceededMaxDeliveryNoDLA(ref, name);
@@ -3191,6 +3198,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
abstract class QueueIterateAction {
public abstract void actMessage(Transaction tx, MessageReference ref) throws Exception;
+
+ public void actMessage(Transaction tx, MessageReference ref, boolean fromMessageReferences) throws Exception {
+ actMessage(tx, ref);
+ }
}
/* For external use we need to use a synchronized version since the list is not thread safe */
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c808f246/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
index 9478690..8bd5d03 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.integration.management;
import static org.apache.activemq.artemis.core.management.impl.openmbean.CompositeDataConstants.BODY;
import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
@@ -36,6 +37,7 @@ import javax.json.JsonObject;
import javax.management.Notification;
import javax.management.openmbean.CompositeData;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.Message;
@@ -57,6 +59,7 @@ import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl;
+import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
@@ -946,8 +949,8 @@ public class QueueControlTest extends ManagementTestBase {
session.createAddress(myTopic, RoutingType.MULTICAST, false);
DivertConfiguration divert = new DivertConfiguration().setName("local-divert")
- .setRoutingName("some-name").setAddress(myTopic.toString())
- .setForwardingAddress(forwardingAddress.toString()).setExclusive(false);
+ .setRoutingName("some-name").setAddress(myTopic.toString())
+ .setForwardingAddress(forwardingAddress.toString()).setExclusive(false);
server.deployDivert(divert);
// Send message to topic.
@@ -1506,6 +1509,63 @@ public class QueueControlTest extends ManagementTestBase {
}
@Test
+ public void testRemoveAllWithPagingMode() throws Exception {
+
+ final int MESSAGE_SIZE = 1024 * 3; // 3k
+
+ // reset maxSize for Paging mode
+ Field maxSizField = PagingManagerImpl.class.getDeclaredField("maxSize");
+ maxSizField.setAccessible(true);
+ maxSizField.setLong(server.getPagingManager(), 10240);
+ clearDataRecreateServerDirs();
+
+ SimpleString address = RandomUtil.randomSimpleString();
+ SimpleString queueName = RandomUtil.randomSimpleString();
+
+ session.createQueue(address, RoutingType.MULTICAST, queueName, null, durable);
+
+ Queue queue = server.locateQueue(queueName);
+ Assert.assertEquals(false, queue.getPageSubscription().isPaging());
+
+ ClientProducer producer = session.createProducer(address);
+
+ byte[] body = new byte[MESSAGE_SIZE];
+
+ ByteBuffer bb = ByteBuffer.wrap(body);
+
+ for (int j = 1; j <= MESSAGE_SIZE; j++) {
+ bb.put(getSamplebyte(j));
+ }
+
+ final int numberOfMessages = 8000;
+ ClientMessage message;
+ for (int i = 0; i < numberOfMessages; i++) {
+ message = session.createMessage(true);
+
+ ActiveMQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ producer.send(message);
+ }
+
+ Assert.assertEquals(true, queue.getPageSubscription().isPaging());
+
+ QueueControl queueControl = createManagementControl(address, queueName);
+ assertMessageMetrics(queueControl, numberOfMessages, durable);
+ int removedMatchedMessagesCount = queueControl.removeAllMessages();
+ Assert.assertEquals(numberOfMessages, removedMatchedMessagesCount);
+ assertMessageMetrics(queueControl, 0, durable);
+
+ Field queueMemoprySizeField = QueueImpl.class.getDeclaredField("queueMemorySize");
+ queueMemoprySizeField.setAccessible(true);
+ AtomicInteger queueMemorySize = (AtomicInteger) queueMemoprySizeField.get(queue);
+ Assert.assertEquals(0, queueMemorySize.get());
+
+ session.deleteQueue(queueName);
+ }
+
+ @Test
public void testRemoveMessagesWithEmptyFilter() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queue = RandomUtil.randomSimpleString();
@@ -2491,8 +2551,8 @@ public class QueueControlTest extends ManagementTestBase {
}
protected void assertMetrics(final QueueControl queueControl, long messageCount, boolean durable,
- Supplier<Number> count, Supplier<Number> size,
- Supplier<Number>durableCount, Supplier<Number> durableSize) throws Exception {
+ Supplier<Number> count, Supplier<Number> size,
+ Supplier<Number> durableCount, Supplier<Number> durableSize) throws Exception {
//make sure count stat equals message count
Assert.assertTrue(Wait.waitFor(() -> count.get().longValue() == messageCount, 3, 100));
[2/2] activemq-artemis git commit: This closes #1907
Posted by cl...@apache.org.
This closes #1907
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/838859f5
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/838859f5
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/838859f5
Branch: refs/heads/master
Commit: 838859f59a27f7e8503510fe0547485c216fa5e4
Parents: 66dbc9e c808f24
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Mar 1 09:53:59 2018 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Mar 1 09:53:59 2018 -0500
----------------------------------------------------------------------
.../artemis/core/server/impl/QueueImpl.java | 17 ++++-
.../management/QueueControlTest.java | 68 ++++++++++++++++++--
2 files changed, 78 insertions(+), 7 deletions(-)
----------------------------------------------------------------------