You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by an...@apache.org on 2018/11/05 10:05:50 UTC
activemq-artemis git commit: ARTEMIS-1710 Allow management msgs to
exceed global-max-size limit
Repository: activemq-artemis
Updated Branches:
refs/heads/2.6.x 1fef3dcc4 -> ade0003c3
ARTEMIS-1710 Allow management msgs to exceed global-max-size limit
(cherry picked from commit 270b383e80296fb47dba6a719ef1616ddcaab1ef)
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ade0003c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ade0003c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ade0003c
Branch: refs/heads/2.6.x
Commit: ade0003c33ba1cd3200b42e877e6fd0e5a3997e3
Parents: 1fef3dc
Author: Francesco Nigro <ni...@gmail.com>
Authored: Sat Nov 3 16:37:26 2018 +0100
Committer: andytaylor <an...@gmail.com>
Committed: Mon Nov 5 10:05:13 2018 +0000
----------------------------------------------------------------------
.../artemis/cli/commands/tools/DBOption.java | 4 +-
.../amqp/broker/AMQPSessionCallback.java | 8 +-
.../core/protocol/openwire/amq/AMQSession.java | 25 ++++--
.../management/impl/AddressControlImpl.java | 28 ++++--
.../core/paging/impl/PagingManagerImpl.java | 18 +++-
.../journal/AbstractJournalStorageManager.java | 3 +
.../core/postoffice/impl/BindingsImpl.java | 6 +-
.../core/postoffice/impl/PostOfficeImpl.java | 6 +-
.../artemis/core/server/QueueConfig.java | 8 +-
.../core/server/impl/ActiveMQServerImpl.java | 2 +-
.../server/impl/PostOfficeJournalLoader.java | 2 +-
.../core/server/impl/ScaleDownHandler.java | 13 ++-
.../core/server/impl/ServerSessionImpl.java | 4 +-
.../core/server/files/FileMoveManagerTest.java | 3 +-
.../broker/region/policy/DestinationProxy.java | 15 +++-
.../integration/paging/GlobalPagingTest.java | 95 ++++++++++++++++++++
.../replication/ReplicationTest.java | 2 +-
.../core/postoffice/impl/BindingsImplTest.java | 2 +-
18 files changed, 203 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ade0003c/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DBOption.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DBOption.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DBOption.java
index aa0b47a..2f2d8e9 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DBOption.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DBOption.java
@@ -241,11 +241,11 @@ public class DBOption extends OptionalLocking {
storageManager, 1000L,
scheduledExecutorService, executorFactory,
false, null);
- pagingmanager = new PagingManagerImpl(pageStoreFactory, addressSettingsRepository);
+ pagingmanager = new PagingManagerImpl(pageStoreFactory, addressSettingsRepository, configuration.getManagementAddress());
} else {
storageManager = new JournalStorageManager(config, EmptyCriticalAnalyzer.getInstance(), executorFactory, executorFactory);
PagingStoreFactory pageStoreFactory = new PagingStoreFactoryNIO(storageManager, config.getPagingLocation(), 1000L, scheduledExecutorService, executorFactory, true, null);
- pagingmanager = new PagingManagerImpl(pageStoreFactory, addressSettingsRepository);
+ pagingmanager = new PagingManagerImpl(pageStoreFactory, addressSettingsRepository, configuration.getManagementAddress());
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ade0003c/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 3d8ae5a..61816af 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -486,7 +486,7 @@ public class AMQPSessionCallback implements SessionCallback {
try {
PagingStore store = manager.getServer().getPagingManager().getPageStore(message.getAddressSimpleString());
- if (store.isRejectingMessages()) {
+ if (store != null && store.isRejectingMessages()) {
// We drop pre-settled messages (and abort any associated Tx)
if (delivery.remotelySettled()) {
if (transaction != null) {
@@ -585,7 +585,11 @@ public class AMQPSessionCallback implements SessionCallback {
pagingManager.checkMemory(runnable);
} else {
final PagingStore store = manager.getServer().getPagingManager().getPageStore(address);
- store.checkMemory(runnable);
+ if (store != null) {
+ store.checkMemory(runnable);
+ } else {
+ runnable.run();
+ }
}
} catch (Exception e) {
throw new RuntimeException(e);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ade0003c/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 0429297..bad9bc5 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -418,9 +418,13 @@ public class AMQSession implements SessionCallback {
//non-persistent messages goes here, by default we stop reading from
//transport
connection.getTransportConnection().setAutoRead(false);
- if (!store.checkMemory(enableAutoReadAndTtl)) {
- enableAutoReadAndTtl();
- throw new ResourceAllocationException("Queue is full " + address);
+ if (store != null) {
+ if (!store.checkMemory(enableAutoReadAndTtl)) {
+ enableAutoReadAndTtl();
+ throw new ResourceAllocationException("Queue is full " + address);
+ }
+ } else {
+ enableAutoReadAndTtl.run();
}
getCoreSession().send(coreMsg, false, dest.isTemporary());
@@ -443,7 +447,7 @@ public class AMQSession implements SessionCallback {
final AtomicInteger count,
final org.apache.activemq.artemis.api.core.Message coreMsg,
final SimpleString address) throws ResourceAllocationException {
- if (!store.checkMemory(false, () -> {
+ final Runnable task = () -> {
Exception exceptionToSend = null;
try {
@@ -496,10 +500,15 @@ public class AMQSession implements SessionCallback {
});
}
}
- })) {
- this.connection.getContext().setDontSendReponse(false);
- connection.enableTtl();
- throw new ResourceAllocationException("Queue is full " + address);
+ };
+ if (store != null) {
+ if (!store.checkMemory(false, task)) {
+ this.connection.getContext().setDontSendReponse(false);
+ connection.enableTtl();
+ throw new ResourceAllocationException("Queue is full " + address);
+ }
+ } else {
+ task.run();
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ade0003c/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
index 0eb39e0..b24c370 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
@@ -201,17 +201,29 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
public long getNumberOfBytesPerPage() throws Exception {
clearIO();
try {
- return pagingManager.getPageStore(addressInfo.getName()).getPageSizeBytes();
+ final PagingStore pagingStore = getPagingStore();
+ if (pagingStore == null) {
+ return 0;
+ }
+ return pagingStore.getPageSizeBytes();
} finally {
blockOnIO();
}
}
+ private PagingStore getPagingStore() throws Exception {
+ return pagingManager.getPageStore(addressInfo.getName());
+ }
+
@Override
public long getAddressSize() throws Exception {
clearIO();
try {
- return pagingManager.getPageStore(addressInfo.getName()).getAddressSize();
+ final PagingStore pagingStore = getPagingStore();
+ if (pagingStore == null) {
+ return 0;
+ }
+ return pagingStore.getAddressSize();
} finally {
blockOnIO();
}
@@ -240,7 +252,11 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
public boolean isPaging() throws Exception {
clearIO();
try {
- return pagingManager.getPageStore(addressInfo.getName()).isPaging();
+ final PagingStore pagingStore = getPagingStore();
+ if (pagingStore == null) {
+ return false;
+ }
+ return pagingStore.isPaging();
} finally {
blockOnIO();
}
@@ -250,12 +266,12 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
public int getNumberOfPages() throws Exception {
clearIO();
try {
- PagingStore pageStore = pagingManager.getPageStore(addressInfo.getName());
+ final PagingStore pageStore = getPagingStore();
- if (!pageStore.isPaging()) {
+ if (pageStore == null || !pageStore.isPaging()) {
return 0;
} else {
- return pagingManager.getPageStore(addressInfo.getName()).getNumberOfPages();
+ return pageStore.getNumberOfPages();
}
} finally {
blockOnIO();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ade0003c/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
index 07f5ad7..357fda2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
@@ -86,6 +86,8 @@ public final class PagingManagerImpl implements PagingManager {
private ActiveMQScheduledComponent scheduledComponent = null;
+ private final SimpleString managementAddress;
+
// Static
// --------------------------------------------------------------------------------------------------------------------------
@@ -105,17 +107,25 @@ public final class PagingManagerImpl implements PagingManager {
public PagingManagerImpl(final PagingStoreFactory pagingSPI,
final HierarchicalRepository<AddressSettings> addressSettingsRepository,
- final long maxSize) {
+ final long maxSize,
+ final SimpleString managementAddress) {
pagingStoreFactory = pagingSPI;
this.addressSettingsRepository = addressSettingsRepository;
addressSettingsRepository.registerListener(this);
this.maxSize = maxSize;
this.memoryExecutor = pagingSPI.newExecutor();
+ this.managementAddress = managementAddress;
}
public PagingManagerImpl(final PagingStoreFactory pagingSPI,
final HierarchicalRepository<AddressSettings> addressSettingsRepository) {
- this(pagingSPI, addressSettingsRepository, -1);
+ this(pagingSPI, addressSettingsRepository, -1, null);
+ }
+
+ public PagingManagerImpl(final PagingStoreFactory pagingSPI,
+ final HierarchicalRepository<AddressSettings> addressSettingsRepository,
+ final SimpleString managementAddress) {
+ this(pagingSPI, addressSettingsRepository, -1, managementAddress);
}
@Override
@@ -329,6 +339,9 @@ public final class PagingManagerImpl implements PagingManager {
*/
@Override
public PagingStore getPageStore(final SimpleString storeName) throws Exception {
+ if (managementAddress != null && storeName.startsWith(managementAddress)) {
+ return null;
+ }
PagingStore store = stores.get(storeName);
if (store != null) {
@@ -438,6 +451,7 @@ public final class PagingManagerImpl implements PagingManager {
}
private PagingStore newStore(final SimpleString address) throws Exception {
+ assert managementAddress == null || (managementAddress != null && !address.startsWith(managementAddress));
syncLock.readLock().lock();
try {
PagingStore store = stores.get(address);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ade0003c/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 166b35b..e681f07 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -1244,6 +1244,9 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
if (queueInfo != null) {
SimpleString address = queueInfo.getAddress();
PagingStore store = pagingManager.getPageStore(address);
+ if (store == null) {
+ return null;
+ }
subs = store.getCursorProvider().getSubscription(queueID);
pageSubscriptions.put(queueID, subs);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ade0003c/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
index c20e988..56abddb 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
@@ -30,7 +30,6 @@ import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
-import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
@@ -62,13 +61,10 @@ public final class BindingsImpl implements Bindings {
private final GroupingHandler groupingHandler;
- private final PagingStore pageStore;
-
private final SimpleString name;
- public BindingsImpl(final SimpleString name, final GroupingHandler groupingHandler, final PagingStore pageStore) {
+ public BindingsImpl(final SimpleString name, final GroupingHandler groupingHandler) {
this.groupingHandler = groupingHandler;
- this.pageStore = pageStore;
this.name = name;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ade0003c/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 68df53d..3bf3d86 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -1200,7 +1200,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
for (Map.Entry<SimpleString, RouteContextList> entry : context.getContexListing().entrySet()) {
PagingStore store = pagingManager.getPageStore(entry.getKey());
- if (storageManager.addToPage(store, message, context.getTransaction(), entry.getValue())) {
+ if (store != null && storageManager.addToPage(store, message, context.getTransaction(), entry.getValue())) {
if (message.isLargeMessage()) {
confirmLargeMessageSend(tx, message);
}
@@ -1564,9 +1564,9 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
@Override
- public Bindings createBindings(final SimpleString address) throws Exception {
+ public Bindings createBindings(final SimpleString address) {
GroupingHandler groupingHandler = server.getGroupingHandler();
- BindingsImpl bindings = new BindingsImpl(address, groupingHandler, pagingManager.getPageStore(address));
+ BindingsImpl bindings = new BindingsImpl(address, groupingHandler);
if (groupingHandler != null) {
groupingHandler.addListener(bindings);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ade0003c/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
index 75f859d..6b1c284 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
@@ -22,6 +22,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.filter.FilterUtils;
import org.apache.activemq.artemis.core.paging.PagingManager;
+import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
public final class QueueConfig {
@@ -163,7 +164,12 @@ public final class QueueConfig {
final PageSubscription pageSubscription;
if (pagingManager != null && !FilterUtils.isTopicIdentification(filter)) {
try {
- pageSubscription = this.pagingManager.getPageStore(address).getCursorProvider().createSubscription(id, filter, durable);
+ final PagingStore pageStore = this.pagingManager.getPageStore(address);
+ if (pageStore != null) {
+ pageSubscription = pageStore.getCursorProvider().createSubscription(id, filter, durable);
+ } else {
+ pageSubscription = null;
+ }
} catch (Exception e) {
throw new IllegalStateException(e);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ade0003c/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 059f6bd..80e4217 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -2144,7 +2144,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
@Override
public PagingManager createPagingManager() throws Exception {
- return new PagingManagerImpl(getPagingStoreFactory(), addressSettingsRepository, configuration.getGlobalMaxSize());
+ return new PagingManagerImpl(getPagingStoreFactory(), addressSettingsRepository, configuration.getGlobalMaxSize(), configuration.getManagementAddress());
}
protected PagingStoreFactory getPagingStoreFactory() throws Exception {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ade0003c/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
index f9ec964..0132818 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
@@ -358,7 +358,7 @@ public class PostOfficeJournalLoader implements JournalLoader {
// This can't be true!
assert (perQueue != null);
- if (store.checkPageFileExists(pageId.intValue())) {
+ if (store != null && store.checkPageFileExists(pageId.intValue())) {
// on this case we need to recalculate the records
Page pg = store.createPage(pageId.intValue());
pg.open();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ade0003c/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
index 02fe1bf..7585165 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
@@ -154,7 +154,9 @@ public class ScaleDownHandler {
Transaction tx = new TransactionImpl(storageManager);
- pageStore.disableCleanup();
+ if (pageStore != null) {
+ pageStore.disableCleanup();
+ }
try {
@@ -240,8 +242,10 @@ public class ScaleDownHandler {
return messageCount;
} finally {
- pageStore.enableCleanup();
- pageStore.getCursorProvider().scheduleCleanup();
+ if (pageStore != null) {
+ pageStore.enableCleanup();
+ pageStore.getCursorProvider().scheduleCleanup();
+ }
}
}
@@ -556,6 +560,9 @@ public class ScaleDownHandler {
public boolean lookup(MessageReference reference) throws Exception {
if (reference.isPaged()) {
+ if (store == null) {
+ return false;
+ }
PageSubscription subscription = store.getCursorProvider().getSubscription(queue.getID());
if (subscription.contains((PagedReference) reference)) {
return true;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ade0003c/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 2e3e2f9..252da69 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -1481,7 +1481,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
final SimpleString addr = removePrefix(address);
PagingStore store = server.getPagingManager().getPageStore(addr);
- if (!store.checkMemory(new Runnable() {
+ if (store == null) {
+ callback.sendProducerCreditsMessage(credits, address);
+ } else if (!store.checkMemory(new Runnable() {
@Override
public void run() {
callback.sendProducerCreditsMessage(credits, address);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ade0003c/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java
index f47c827..bf8cfb2 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java
@@ -28,6 +28,7 @@ import java.io.PrintWriter;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
@@ -304,7 +305,7 @@ public class FileMoveManagerTest {
PagingStoreFactoryNIO storeFactory = new PagingStoreFactoryNIO(storageManager, dataLocation, 100, null, new OrderedExecutorFactory(threadPool), true, null);
- PagingManagerImpl managerImpl = new PagingManagerImpl(storeFactory, addressSettings, -1);
+ PagingManagerImpl managerImpl = new PagingManagerImpl(storeFactory, addressSettings, -1, ActiveMQDefaultConfiguration.getDefaultManagementAddress());
managerImpl.start();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ade0003c/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/region/policy/DestinationProxy.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/region/policy/DestinationProxy.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/region/policy/DestinationProxy.java
index 3f6c252..a9acce4 100644
--- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/region/policy/DestinationProxy.java
+++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/region/policy/DestinationProxy.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.broker.region.policy;
+import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.broker.ConnectionContext;
@@ -153,7 +154,11 @@ public class DestinationProxy implements Destination {
@Override
public long getUsage() {
try {
- return server.getPagingManager().getPageStore(view.getAddress()).getAddressSize();
+ final PagingStore pageStore = server.getPagingManager().getPageStore(view.getAddress());
+ if (pageStore == null) {
+ return 0;
+ }
+ return pageStore.getAddressSize();
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -221,9 +226,13 @@ public class DestinationProxy implements Destination {
@Override
public int getPercentUsage() {
- long total = 0;
+ final long total;
try {
- total = server.getPagingManager().getPageStore(view.getAddress()).getMaxSize();
+ final PagingStore pageStore = server.getPagingManager().getPageStore(view.getAddress());
+ if (pageStore == null) {
+ return 0;
+ }
+ total = pageStore.getMaxSize();
} catch (Exception e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ade0003c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java
index 4c77532..d4cbdd3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java
@@ -19,15 +19,23 @@ package org.apache.activemq.artemis.tests.integration.paging;
import java.nio.ByteBuffer;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientRequestor;
import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
+import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.Queue;
@@ -192,4 +200,91 @@ public class GlobalPagingTest extends PagingTest {
session.commit();
}
+ @Test
+ public void testManagementAddressCannotPageOrChangeGlobalSize() throws Exception {
+ clearDataRecreateServerDirs();
+
+ Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
+
+ final ActiveMQServer server = createServer(true, config, PagingTest.PAGE_SIZE, -1);
+
+ try {
+ final SimpleString managementAddress = server.getConfiguration().getManagementAddress();
+ server.getConfiguration().setGlobalMaxSize(1);
+ server.start();
+
+ final ServerLocator locator = createInVMNonHALocator()
+ .setBlockOnNonDurableSend(true)
+ .setBlockOnDurableSend(true)
+ .setBlockOnAcknowledge(true);
+
+ try (ClientSessionFactory sf = createSessionFactory(locator);
+
+ ClientSession session = sf.createSession(false, true, true)) {
+
+ session.start();
+
+ if (server.locateQueue(managementAddress) == null) {
+
+ session.createQueue(managementAddress, managementAddress, null, true);
+ }
+
+ final Queue managementQueue = server.locateQueue(managementAddress);
+
+ Assert.assertNull(managementQueue.getPageSubscription());
+
+ Assert.assertNull(server.getPagingManager().getPageStore(managementAddress));
+
+ final SimpleString address = SimpleString.toSimpleString("queue");
+
+ if (server.locateQueue(address) == null) {
+
+ session.createQueue(address, address, null, true);
+ }
+
+ final CountDownLatch startSendMessages = new CountDownLatch(1);
+
+ final PagingManager pagingManager = server.getPagingManager();
+
+ final long globalSize = pagingManager.getGlobalSize();
+
+ final Thread globalSizeChecker = new Thread(() -> {
+ startSendMessages.countDown();
+ while (!Thread.currentThread().isInterrupted()) {
+ Assert.assertEquals(globalSize, pagingManager.getGlobalSize());
+ }
+ });
+
+ globalSizeChecker.start();
+
+ try (ClientRequestor requestor = new ClientRequestor(session, managementAddress)) {
+
+ ClientMessage message = session.createMessage(false);
+
+ ManagementHelper.putAttribute(message, "queue." + address.toString(), "messageCount");
+
+ Assert.assertTrue("bodySize = " + message.getBodySize() + " must be > of globalMaxSize = " + server.getConfiguration().getGlobalMaxSize(), message.getBodySize() > server.getConfiguration().getGlobalMaxSize());
+
+ startSendMessages.await();
+
+ for (int i = 0; i < 100; i++) {
+ try {
+ ClientMessage reply = requestor.request(message);
+ Assert.assertEquals(0L, ManagementHelper.getResult(reply));
+ } catch (ActiveMQAddressFullException e) {
+ Assert.fail(e.getMessage());
+ return;
+ }
+ }
+
+ } finally {
+ globalSizeChecker.interrupt();
+ }
+ }
+
+ } finally {
+ server.stop(true);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ade0003c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
index 07c02c3..c27ea68 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
@@ -598,7 +598,7 @@ public final class ReplicationTest extends ActiveMQTestBase {
final ExecutorFactory executorFactory,
final HierarchicalRepository<AddressSettings> addressSettingsRepository) throws Exception {
- PagingManager paging = new PagingManagerImpl(new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), 1000, null, executorFactory, false, null), addressSettingsRepository);
+ PagingManager paging = new PagingManagerImpl(new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), 1000, null, executorFactory, false, null), addressSettingsRepository, configuration.getManagementAddress());
paging.start();
return paging;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ade0003c/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
index 830e61f..40202e5 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
@@ -70,7 +70,7 @@ public class BindingsImplTest extends ActiveMQTestBase {
private void internalTest(final boolean route) throws Exception {
final FakeBinding fake = new FakeBinding(new SimpleString("a"));
- final Bindings bind = new BindingsImpl(null, null, null);
+ final Bindings bind = new BindingsImpl(null, null);
bind.addBinding(fake);
bind.addBinding(new FakeBinding(new SimpleString("a")));
bind.addBinding(new FakeBinding(new SimpleString("a")));