You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2020/10/15 04:06:52 UTC
[activemq-artemis] branch master updated: ARTEMIS-2859 - track
owning page store as in a message reference to ensure correct usage
tracking, only track size on the owning store,
reference everywhere else via refUp
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 4e8ce9e ARTEMIS-2859 - track owning page store as in a message reference to ensure correct usage tracking, only track size on the owning store, reference everywhere else via refUp
new 4395a95 This closes #3282
4e8ce9e is described below
commit 4e8ce9ed1001f74dec600e3aff85ebe605d66a2a
Author: gtully <ga...@gmail.com>
AuthorDate: Wed Oct 14 16:54:39 2020 +0100
ARTEMIS-2859 - track owning page store as in a message reference to ensure correct usage tracking, only track size on the owning store, reference everywhere else via refUp
---
.../core/paging/cursor/PagedReferenceImpl.java | 11 +++
.../artemis/core/paging/impl/PagingStoreImpl.java | 16 +---
.../core/postoffice/impl/PostOfficeImpl.java | 24 +++--
.../artemis/core/server/MessageReference.java | 9 +-
.../apache/activemq/artemis/core/server/Queue.java | 4 +-
.../server/impl/GroupFirstMessageReference.java | 12 +++
.../artemis/core/server/impl/LastValueQueue.java | 10 ++
.../core/server/impl/MessageReferenceImpl.java | 18 +++-
.../artemis/core/server/impl/QueueImpl.java | 39 ++++----
.../artemis/core/server/impl/RefsOperation.java | 4 +-
.../core/server/impl/ServerConsumerImpl.java | 2 +-
.../server/impl/ScheduledDeliveryHandlerTest.java | 9 +-
.../artemis/tests/util/ActiveMQTestBase.java | 2 +-
.../jms/cluster/TopicClusterPageStoreSizeTest.java | 105 +++++++++++++++++++++
.../tests/integration/paging/GlobalPagingTest.java | 2 +-
.../core/server/impl/QueueConcurrentTest.java | 2 +-
.../impl/fakes/FakeSequentialFileFactory.java | 3 +-
.../tests/unit/core/postoffice/impl/FakeQueue.java | 4 +-
18 files changed, 214 insertions(+), 62 deletions(-)
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
index 76f5a05..27e6167 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
@@ -23,6 +23,7 @@ import java.util.function.Consumer;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
@@ -406,6 +407,16 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
}
@Override
+ public PagingStore getOwner() {
+ return null;
+ }
+
+ @Override
+ public void setOwner(PagingStore owner) {
+
+ }
+
+ @Override
public boolean isDurable() {
if (durable == UNDEFINED_IS_DURABLE) {
durable = getMessage().isDurable() ? IS_DURABLE : IS_NOT_DURABLE;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index 6fb4797..31c969d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -966,11 +966,7 @@ public class PagingStoreImpl implements PagingStore {
@Override
public void refUp(Message message, int count) {
- if (count == 1) {
- this.addSize(message.getMemoryEstimate() + MessageReferenceImpl.getMemoryEstimate());
- } else {
- this.addSize(MessageReferenceImpl.getMemoryEstimate());
- }
+ this.addSize(MessageReferenceImpl.getMemoryEstimate());
}
@Override
@@ -979,15 +975,7 @@ public class PagingStoreImpl implements PagingStore {
// this could happen on paged messages since they are not routed and refUp is never called
return;
}
-
- if (count == 0) {
- this.addSize(-message.getMemoryEstimate() - MessageReferenceImpl.getMemoryEstimate());
-
- } else {
- this.addSize(-MessageReferenceImpl.getMemoryEstimate());
- }
-
-
+ this.addSize(-MessageReferenceImpl.getMemoryEstimate());
}
private void installPageTransaction(final Transaction tx, final RouteContextList listCtx) throws Exception {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index a3576b3..a904baf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -1201,7 +1201,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
@Override
public MessageReference reload(final Message message, final Queue queue, final Transaction tx) throws Exception {
- MessageReference reference = MessageReference.Factory.createReference(message, queue);
+ MessageReference reference = MessageReference.Factory.createReference(message, queue, pagingManager.getPageStore(message.getAddressSimpleString()));
Long scheduledDeliveryTime;
if (message.hasScheduledDeliveryTime()) {
@@ -1211,6 +1211,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
}
+ queue.refUp(reference);
queue.durableUp(message);
if (tx == null) {
@@ -1455,8 +1456,14 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
deliveryTime = message.getScheduledDeliveryTime();
}
+ PagingStore owningStore = pagingManager.getPageStore(message.getAddressSimpleString());
for (Map.Entry<SimpleString, RouteContextList> entry : context.getContexListing().entrySet()) {
- PagingStore store = pagingManager.getPageStore(entry.getKey());
+ PagingStore store;
+ if (entry.getKey() == message.getAddressSimpleString() || entry.getKey().equals(message.getAddressSimpleString())) {
+ store = owningStore;
+ } else {
+ store = pagingManager.getPageStore(entry.getKey());
+ }
if (store != null && storageManager.addToPage(store, message, context.getTransaction(), entry.getValue())) {
if (message.isLargeMessage()) {
@@ -1469,14 +1476,14 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
for (Queue queue : entry.getValue().getNonDurableQueues()) {
- MessageReference reference = MessageReference.Factory.createReference(message, queue);
+ MessageReference reference = MessageReference.Factory.createReference(message, queue, owningStore);
if (deliveryTime != null) {
reference.setScheduledDeliveryTime(deliveryTime);
}
refs.add(reference);
- queue.refUp(message);
+ queue.refUp(reference);
}
Iterator<Queue> iter = entry.getValue().getDurableQueues().iterator();
@@ -1484,7 +1491,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
while (iter.hasNext()) {
Queue queue = iter.next();
- MessageReference reference = MessageReference.Factory.createReference(message, queue);
+ MessageReference reference = MessageReference.Factory.createReference(message, queue, owningStore);
if (context.isAlreadyAcked(context.getAddress(message), queue)) {
reference.setAlreadyAcked();
@@ -1497,6 +1504,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
reference.setScheduledDeliveryTime(deliveryTime);
}
refs.add(reference);
+ queue.refUp(reference);
if (message.isDurable()) {
int durableRefCount = queue.durableUp(message);
@@ -1528,8 +1536,6 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
storageManager.updateScheduledDeliveryTime(reference);
}
}
- } else {
- queue.refUp(message);
}
}
}
@@ -1852,12 +1858,10 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
// Reverse the ref counts, and paging sizes
for (MessageReference ref : refs) {
+ ref.getQueue().refDown(ref);
Message message = ref.getMessage();
-
if (message.isDurable() && ref.getQueue().isDurable()) {
ref.getQueue().durableDown(message);
- } else {
- ref.getQueue().refDown(message);
}
}
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
index c55910b..6765e4f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
@@ -22,6 +22,7 @@ import java.util.function.Consumer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -34,8 +35,8 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
public interface MessageReference {
final class Factory {
- public static MessageReference createReference(Message encode, final Queue queue) {
- return new MessageReferenceImpl(encode, queue);
+ public static MessageReference createReference(Message encode, final Queue queue, PagingStore pageStore) {
+ return new MessageReferenceImpl(encode, queue, pageStore);
}
}
boolean isPaged();
@@ -136,4 +137,8 @@ public interface MessageReference {
* @throws ActiveMQException
*/
long getPersistentSize() throws ActiveMQException;
+
+ PagingStore getOwner();
+
+ void setOwner(PagingStore owner);
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index 0f2a071..9ce31d1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -68,9 +68,9 @@ public interface Queue extends Bindable,CriticalComponent {
int durableDown(Message message);
- void refUp(Message message);
+ void refUp(MessageReference messageReference);
- void refDown(Message message);
+ void refDown(MessageReference messageReference);
/**
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/GroupFirstMessageReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/GroupFirstMessageReference.java
index d4db19b..6e2f1fc 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/GroupFirstMessageReference.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/GroupFirstMessageReference.java
@@ -20,6 +20,7 @@ import java.util.function.Consumer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
@@ -37,6 +38,7 @@ public class GroupFirstMessageReference implements MessageReference {
private final MessageReference messageReference;
private final SimpleString key;
private volatile Message message;
+ private volatile PagingStore owner;
public GroupFirstMessageReference(SimpleString key, MessageReference messageReference) {
this.messageReference = messageReference;
@@ -215,4 +217,14 @@ public class GroupFirstMessageReference implements MessageReference {
public long getPersistentSize() throws ActiveMQException {
return messageReference.getPersistentSize();
}
+
+ @Override
+ public PagingStore getOwner() {
+ return this.owner;
+ }
+
+ @Override
+ public void setOwner(PagingStore owner) {
+ this.owner = owner;
+ }
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index 9f3c82b..3cb3d09 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -538,6 +538,16 @@ public class LastValueQueue extends QueueImpl {
public long getPersistentSize() throws ActiveMQException {
return ref.getPersistentSize();
}
+
+ @Override
+ public PagingStore getOwner() {
+ return ref.getOwner();
+ }
+
+ @Override
+ public void setOwner(PagingStore owner) {
+ ref.setOwner(owner);
+ }
}
@Override
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
index dea1478..09b3650 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
@@ -23,6 +23,7 @@ import java.util.function.Consumer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
@@ -35,6 +36,7 @@ import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceImpl> implements MessageReference, Runnable {
private static final MessageReferenceComparatorByID idComparator = new MessageReferenceComparatorByID();
+ private volatile PagingStore owner;
public static Comparator<MessageReference> getIDComparator() {
return idComparator;
@@ -102,12 +104,16 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
message = other.message;
this.queue = queue;
+
+ this.owner = other.owner;
}
- public MessageReferenceImpl(final Message message, final Queue queue) {
+ public MessageReferenceImpl(final Message message, final Queue queue, final PagingStore owner) {
this.message = message;
this.queue = queue;
+
+ this.owner = owner;
}
// MessageReference implementation -------------------------------
@@ -348,4 +354,14 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
public long getPersistentSize() throws ActiveMQException {
return this.getMessage().getPersistentSize();
}
+
+ @Override
+ public PagingStore getOwner() {
+ return this.owner;
+ }
+
+ @Override
+ public void setOwner(PagingStore owner) {
+ this.owner = owner;
+ }
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 0013506..35f66f9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -970,36 +970,37 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public int durableUp(Message message) {
- int count = message.durableUp();
- if (pagingStore != null) {
- pagingStore.durableUp(message, count);
- }
- return count;
+ return message.durableUp();
}
@Override
public int durableDown(Message message) {
- int count = message.durableDown();
- if (pagingStore != null) {
- pagingStore.durableDown(message, count);
- }
- return count;
+ return message.durableDown();
}
@Override
- public void refUp(Message message) {
- int count = message.refUp();
+ public void refUp(MessageReference messageReference) {
+ int count = messageReference.getMessage().refUp();
+ if (count == 1) {
+ if (messageReference.getOwner() != null) {
+ messageReference.getOwner().addSize(messageReference.getMessageMemoryEstimate());
+ }
+ }
if (pagingStore != null) {
- pagingStore.refUp(message, count);
+ pagingStore.refUp(messageReference.getMessage(), count);
}
-
}
@Override
- public void refDown(Message message) {
- int count = message.refDown();
+ public void refDown(MessageReference messageReference) {
+ int count = messageReference.getMessage().refDown();
+ if (count == 0) {
+ if (messageReference.getOwner() != null) {
+ messageReference.getOwner().addSize(-messageReference.getMessageMemoryEstimate());
+ }
+ }
if (pagingStore != null) {
- pagingStore.refDown(message, count);
+ pagingStore.refDown(messageReference.getMessage(), count);
}
}
@@ -3826,6 +3827,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (message == null || (nonDestructive && reason == AckReason.NORMAL))
return;
+ queue.refDown(ref);
+
boolean durableRef = message.isDurable() && queue.isDurable();
if (durableRef) {
@@ -3854,8 +3857,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
ActiveMQServerLogger.LOGGER.errorRemovingMessage(e, message.getMessageID());
}
}
- } else {
- queue.refDown(message);
}
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
index 552c190..054ba73 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
@@ -144,7 +144,7 @@ public class RefsOperation extends TransactionOperationAbstract {
ackedTX.setContainsPersistent();
}
- ref.getQueue().refUp(message);
+ ref.getQueue().refUp(ref);
}
ackedTX.commit(true);
} catch (Exception e) {
@@ -188,7 +188,7 @@ public class RefsOperation extends TransactionOperationAbstract {
for (MessageReference refmsg : pagedMessagesToPostACK) {
((PagedReference)refmsg).removePendingFlag();
if (((PagedReference) refmsg).isLargeMessage()) {
- refmsg.getQueue().refDown(refmsg.getMessage());
+ refmsg.getQueue().refDown(refmsg);
}
}
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 7f71970..35a90e8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -646,7 +646,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
public void forceDelivery(final long sequence) {
forceDelivery(sequence, () -> {
Message forcedDeliveryMessage = new CoreMessage(storageManager.generateID(), 50);
- MessageReference reference = MessageReference.Factory.createReference(forcedDeliveryMessage, messageQueue);
+ MessageReference reference = MessageReference.Factory.createReference(forcedDeliveryMessage, messageQueue, null);
reference.setDeliveryCount(0);
forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 00b205e..8996287 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -250,7 +250,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
long nextMessageID,
long nextScheduledTime,
boolean tail) {
- MessageReferenceImpl refImpl = new MessageReferenceImpl(new FakeMessage(nextMessageID), null);
+ MessageReferenceImpl refImpl = new MessageReferenceImpl(new FakeMessage(nextMessageID), null, null);
refImpl.setScheduledDeliveryTime(nextScheduledTime);
handler.addInPlace(nextScheduledTime, refImpl, tail);
}
@@ -260,7 +260,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
long nextScheduledTime,
boolean tail,
Queue queue) {
- MessageReferenceImpl refImpl = new MessageReferenceImpl(new FakeMessage(nextMessageID), queue);
+ MessageReferenceImpl refImpl = new MessageReferenceImpl(new FakeMessage(nextMessageID), queue, null);
refImpl.setScheduledDeliveryTime(nextScheduledTime);
handler.checkAndSchedule(refImpl, tail);
}
@@ -808,6 +808,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
public long getPersistentSize() throws ActiveMQException {
return 0;
}
+
}
public class FakeQueueForScheduleUnitTest extends CriticalComponentImpl implements Queue {
@@ -843,12 +844,12 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
- public void refUp(Message message) {
+ public void refUp(MessageReference messageReference) {
}
@Override
- public void refDown(Message message) {
+ public void refDown(MessageReference messageReference) {
}
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index d1adf59..b538f8a 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -2138,7 +2138,7 @@ public abstract class ActiveMQTestBase extends Assert {
protected MessageReference generateReference(final Queue queue, final long id) {
Message message = generateMessage(id);
- return MessageReference.Factory.createReference(message, queue);
+ return MessageReference.Factory.createReference(message, queue, null);
}
protected int calculateRecordSize(final int size, final int alignment) {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/TopicClusterPageStoreSizeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/TopicClusterPageStoreSizeTest.java
new file mode 100644
index 0000000..b8bf8b8
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/TopicClusterPageStoreSizeTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.jms.cluster;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.tests.util.JMSClusteredTestBase;
+import org.junit.Test;
+
+public class TopicClusterPageStoreSizeTest extends JMSClusteredTestBase {
+
+ public static final String TOPIC = "jms.t1";
+
+ @Test
+ public void testPageStoreSizeWithClusteredDurableSub() throws Exception {
+ doTestPageStoreSizeWithClusteredDurableSub(false);
+ }
+
+ @Test
+ public void testPageStoreSizeWithClusteredDurableSubWithPaging() throws Exception {
+ doTestPageStoreSizeWithClusteredDurableSub(true);
+ }
+
+ private void doTestPageStoreSizeWithClusteredDurableSub(boolean forcePaging) throws Exception {
+
+ Connection conn1 = cf1.createConnection();
+
+ conn1.setClientID("someClient1");
+
+ Connection conn2 = cf2.createConnection();
+
+ conn2.setClientID("someClient2");
+
+ conn1.start();
+
+ conn2.start();
+
+ Topic topic1 = createTopic(TOPIC, true);
+
+ Session session1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Session session2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod1 = session1.createProducer(null);
+ prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ MessageConsumer cons1 = session1.createDurableSubscriber(topic1, "sub1");
+ MessageConsumer cons2 = session2.createDurableSubscriber(topic1, "sub2");
+
+ waitForBindings(server1, TOPIC, true, 1, 1, 2000);
+ waitForBindings(server2, TOPIC, true, 1, 1, 2000);
+ waitForBindings(server1, TOPIC, false, 1, 1, 2000);
+ waitForBindings(server2, TOPIC, false, 1, 1, 2000);
+
+ if (forcePaging) {
+ for (SimpleString psName : server1.getPagingManager().getStoreNames()) {
+ server1.getPagingManager().getPageStore(psName).startPaging();
+ }
+ for (SimpleString psName : server2.getPagingManager().getStoreNames()) {
+ server2.getPagingManager().getPageStore(psName).startPaging();
+ }
+ }
+
+ prod1.send(topic1, session1.createTextMessage("someMessage"));
+
+ TextMessage m2 = (TextMessage) cons2.receive(5000);
+ assertNotNull(m2);
+ TextMessage m1 = (TextMessage) cons1.receive(5000);
+ assertTrue(m1.getJMSDestination().toString().contains(TOPIC));
+
+ assertNotNull(m1);
+
+ conn1.close();
+ conn2.close();
+
+ for (SimpleString psName : server1.getPagingManager().getStoreNames()) {
+ assertTrue("non negative size: " + psName, server1.getPagingManager().getPageStore(psName).getAddressSize() >= 0);
+ }
+
+ for (SimpleString psName : server2.getPagingManager().getStoreNames()) {
+ assertTrue("non negative size: " + psName, server2.getPagingManager().getPageStore(psName).getAddressSize() >= 0);
+ }
+ }
+}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java
index 3dee2b2..97c941e 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java
@@ -334,7 +334,7 @@ public class GlobalPagingTest extends PagingTest {
int id = 1000;
try (ClientConsumer consumer = session.createConsumer(replyQueue)) {
final Queue queue = server.locateQueue(replyQueue);
- final MessageReference reference = MessageReference.Factory.createReference(session.createMessage(false), queue);
+ final MessageReference reference = MessageReference.Factory.createReference(session.createMessage(false), queue, null);
reference.getMessage().setMessageID(id++);
//it will cause QueueImpl::directDeliver -> false
queue.addHead(reference, false);
diff --git a/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueConcurrentTest.java b/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueConcurrentTest.java
index 6d73cfd..3cf0cae 100644
--- a/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueConcurrentTest.java
+++ b/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueConcurrentTest.java
@@ -140,7 +140,7 @@ public class QueueConcurrentTest extends ActiveMQTestBase {
while (System.currentTimeMillis() - start < testTime) {
Message message = generateMessage(i);
- MessageReference ref = MessageReference.Factory.createReference(message, queue);
+ MessageReference ref = MessageReference.Factory.createReference(message, queue, null);
queue.addTail(ref, false);
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
index 9091c81..0c5d89e 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
@@ -312,11 +312,10 @@ public class FakeSequentialFileFactory implements SequentialFileFactory {
@Override
public void delete() {
+ fileMap.remove(fileName);
if (open) {
close();
}
-
- fileMap.remove(fileName);
}
@Override
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index 1ab01c1..80a9252 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -76,12 +76,12 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
}
@Override
- public void refUp(Message message) {
+ public void refUp(MessageReference messageReference) {
}
@Override
- public void refDown(Message message) {
+ public void refDown(MessageReference messageReference) {
}