You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/08/01 01:39:29 UTC
[2/3] activemq-artemis git commit: ARTEMIS-1732 I simplified some of
the changes performed at the previous commit.
ARTEMIS-1732 I simplified some of the changes performed at the previous commit.
Also I changed GlobalDiskFullTest to actually block the senders.
I moved the Runnables from PagingManager into the Util as AtomicRunnable.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/0e36e072
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/0e36e072
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/0e36e072
Branch: refs/heads/master
Commit: 0e36e072bdf0c4636623aacbd15912857770c73f
Parents: 53e1d60
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Jul 31 21:08:46 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Jul 31 21:39:04 2018 -0400
----------------------------------------------------------------------
.../artemis/utils/runnables/AtomicRunnable.java | 47 ++++++++++++
.../runnables/AtomicRunnableWithDelegate.java | 32 ++++++++
.../amqp/broker/AMQPSessionCallback.java | 10 +--
.../artemis/core/paging/PagingManager.java | 59 ++------------
.../artemis/core/paging/PagingStore.java | 7 +-
.../core/paging/impl/PagingManagerImpl.java | 81 ++++++++++----------
.../core/paging/impl/PagingStoreImpl.java | 23 ++++--
.../core/server/ActiveMQServerLogger.java | 13 ----
.../core/server/files/FileStoreMonitor.java | 9 +--
.../core/server/files/FileStoreMonitorTest.java | 10 +++
.../integration/amqp/GlobalDiskFullTest.java | 75 ++++++++++++++----
.../tests/unit/util/FakePagingManager.java | 12 +--
12 files changed, 232 insertions(+), 146 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e36e072/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnable.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnable.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnable.java
new file mode 100644
index 0000000..f1f53ce
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnable.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils.runnables;
+
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
+public abstract class AtomicRunnable implements Runnable {
+
+
+ public static Runnable checkAtomic(Runnable run) {
+ if (run instanceof AtomicRunnable) {
+ return run;
+ } else {
+ return new AtomicRunnableWithDelegate(run);
+ }
+ }
+
+ private volatile int ran;
+
+ private static final AtomicIntegerFieldUpdater<AtomicRunnable> RAN_UPDATE =
+ AtomicIntegerFieldUpdater.newUpdater(AtomicRunnable.class, "ran");
+
+ @Override
+ public void run() {
+ if (RAN_UPDATE.compareAndSet(this, 0, 1)) {
+ atomicRun();
+ }
+ }
+
+ public abstract void atomicRun();
+}
+
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e36e072/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnableWithDelegate.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnableWithDelegate.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnableWithDelegate.java
new file mode 100644
index 0000000..d1583de
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnableWithDelegate.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils.runnables;
+
+public class AtomicRunnableWithDelegate extends AtomicRunnable {
+
+ private final Runnable runnable;
+
+ public AtomicRunnableWithDelegate(Runnable runnable) {
+ this.runnable = runnable;
+ }
+
+ @Override
+ public void atomicRun() {
+ runnable.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e36e072/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 1f5ccbc..86c0687 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -581,9 +581,7 @@ public class AMQPSessionCallback implements SessionCallback {
Runnable creditRunnable = () -> {
connection.lock();
try {
- if (receiver.getRemoteCredit() <= threshold) {
- receiver.flow(credits);
- }
+ receiver.flow(credits);
} finally {
connection.unlock();
}
@@ -592,10 +590,10 @@ public class AMQPSessionCallback implements SessionCallback {
if (address == null) {
pagingManager.checkMemory(creditRunnable);
- return;
+ } else {
+ final PagingStore store = manager.getServer().getPagingManager().getPageStore(address);
+ store.checkMemory(creditRunnable);
}
- final PagingStore store = manager.getServer().getPagingManager().getPageStore(address);
- store.checkMemory(creditRunnable);
} catch (Exception e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e36e072/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java
index c8eb2ec..5d8461e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java
@@ -17,9 +17,6 @@
package org.apache.activemq.artemis.core.paging;
import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
@@ -82,7 +79,7 @@ public interface PagingManager extends ActiveMQComponent, HierarchicalRepository
void resumeCleanup();
- void addBlockedStore(Blockable store);
+ void addBlockedStore(PagingStore store);
void injectMonitor(FileStoreMonitor monitor) throws Exception;
@@ -114,54 +111,10 @@ public interface PagingManager extends ActiveMQComponent, HierarchicalRepository
return 0;
}
- boolean checkMemory(Runnable runnable);
-
- // To be used when the memory is oversized either by local settings or global settings on blocking addresses
- final class OverSizedRunnable implements Runnable {
-
- private final AtomicBoolean ran = new AtomicBoolean(false);
-
- private final Runnable runnable;
-
- public OverSizedRunnable(final Runnable runnable) {
- this.runnable = runnable;
- }
-
- @Override
- public void run() {
- if (ran.compareAndSet(false, true)) {
- runnable.run();
- }
- }
- }
-
- interface Blockable {
- /**
- * It will return true if the destination is leaving blocking.
- */
- boolean checkReleasedMemory();
- }
-
- final class MemoryFreedRunnablesExecutor implements Runnable {
-
- private final Queue<OverSizedRunnable> onMemoryFreedRunnables = new ConcurrentLinkedQueue<>();
-
- public void addRunnable(PagingManager.OverSizedRunnable runnable) {
- onMemoryFreedRunnables.add(runnable);
- }
-
- @Override
- public void run() {
- Runnable runnable;
-
- while ((runnable = onMemoryFreedRunnables.poll()) != null) {
- runnable.run();
- }
- }
-
- public boolean isEmpty() {
- return onMemoryFreedRunnables.isEmpty();
- }
- }
+ /**
+ * Use this when you have no refernce of an address. (anonymous AMQP Producers for example)
+ * @param runWhenAvailable
+ */
+ void checkMemory(Runnable runWhenAvailable);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e36e072/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
index 27e8c0f..4dd8bf8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
@@ -42,7 +42,7 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
*
* @see PagingManager
*/
-public interface PagingStore extends ActiveMQComponent, RefCountMessageListener, PagingManager.Blockable {
+public interface PagingStore extends ActiveMQComponent, RefCountMessageListener {
SimpleString getAddress();
@@ -132,6 +132,11 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener,
boolean isRejectingMessages();
/**
+ * It will return true if the destination is leaving blocking.
+ */
+ boolean checkReleasedMemory();
+
+ /**
* Write lock the PagingStore.
*
* @param timeout milliseconds to wait for the lock. If value is {@literal -1} then wait
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e36e072/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
index 878f918..8893984 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
@@ -20,8 +20,10 @@ import java.nio.file.FileStore;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
@@ -39,6 +41,7 @@ import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
+import org.apache.activemq.artemis.utils.runnables.AtomicRunnable;
import org.jboss.logging.Logger;
public final class PagingManagerImpl implements PagingManager {
@@ -57,7 +60,7 @@ public final class PagingManagerImpl implements PagingManager {
*/
private final ReentrantReadWriteLock syncLock = new ReentrantReadWriteLock();
- private final Set<Blockable> blockedStored = new ConcurrentHashSet<>();
+ private final Set<PagingStore> blockedStored = new ConcurrentHashSet<>();
private final ConcurrentMap<SimpleString, PagingStore> stores = new ConcurrentHashMap<>();
@@ -75,13 +78,14 @@ public final class PagingManagerImpl implements PagingManager {
private volatile boolean diskFull = false;
+ private final Executor memoryExecutor;
+
+ private final Queue<Runnable> memoryCallback = new ConcurrentLinkedQueue<>();
+
private final ConcurrentMap</*TransactionID*/Long, PageTransactionInfo> transactions = new ConcurrentHashMap<>();
private ActiveMQScheduledComponent scheduledComponent = null;
- private final PagingManager.MemoryFreedRunnablesExecutor memoryFreedRunnablesExecutor = new PagingManager.MemoryFreedRunnablesExecutor();
-
- private final Executor executor;
// Static
// --------------------------------------------------------------------------------------------------------------------------
@@ -106,7 +110,7 @@ public final class PagingManagerImpl implements PagingManager {
this.addressSettingsRepository = addressSettingsRepository;
addressSettingsRepository.registerListener(this);
this.maxSize = maxSize;
- executor = pagingStoreFactory.newExecutor();
+ this.memoryExecutor = pagingSPI.newExecutor();
}
public PagingManagerImpl(final PagingStoreFactory pagingSPI,
@@ -115,7 +119,7 @@ public final class PagingManagerImpl implements PagingManager {
}
@Override
- public void addBlockedStore(Blockable store) {
+ public void addBlockedStore(PagingStore store) {
blockedStored.add(store);
}
@@ -157,42 +161,18 @@ public final class PagingManagerImpl implements PagingManager {
return globalSizeBytes.get();
}
- @Override
- public boolean checkMemory(final Runnable runWhenAvailable) {
- if (isGlobalFull()) {
- OverSizedRunnable ourRunnable = new OverSizedRunnable(runWhenAvailable);
-
- memoryFreedRunnablesExecutor.addRunnable(ourRunnable);
- addBlockedStore(() -> {
- if (!isGlobalFull()) {
- if (!memoryFreedRunnablesExecutor.isEmpty()) {
- executor.execute(memoryFreedRunnablesExecutor);
- ActiveMQServerLogger.LOGGER.unblockingGlobalMessageProduction(getGlobalSize());
- return true;
- }
- }
- return false;
- });
-
- if (isDiskFull()) {
- ActiveMQServerLogger.LOGGER.blockingGlobalDiskFull();
- } else {
- ActiveMQServerLogger.LOGGER.blockingGlobalMessageProduction(getGlobalSize());
- }
-
- return true;
- }
-
- runWhenAvailable.run();
-
- return true;
- }
-
protected void checkMemoryRelease() {
if (!diskFull && (maxSize < 0 || globalSizeBytes.get() < maxSize) && !blockedStored.isEmpty()) {
- Iterator<Blockable> storeIterator = blockedStored.iterator();
+ if (!memoryCallback.isEmpty()) {
+ if (memoryExecutor != null) {
+ memoryExecutor.execute(this::memoryReleased);
+ } else {
+ memoryReleased();
+ }
+ }
+ Iterator<PagingStore> storeIterator = blockedStored.iterator();
while (storeIterator.hasNext()) {
- Blockable store = storeIterator.next();
+ PagingStore store = storeIterator.next();
if (store.checkReleasedMemory()) {
storeIterator.remove();
}
@@ -223,7 +203,7 @@ public final class PagingManagerImpl implements PagingManager {
@Override
public void under(FileStore store, double usage) {
- if (diskFull) {
+ if (diskFull || !blockedStored.isEmpty() || !memoryCallback.isEmpty()) {
ActiveMQServerLogger.LOGGER.diskCapacityRestored();
diskFull = false;
checkMemoryRelease();
@@ -242,6 +222,27 @@ public final class PagingManagerImpl implements PagingManager {
}
@Override
+ public void checkMemory(final Runnable runWhenAvailable) {
+
+ if (isGlobalFull()) {
+ memoryCallback.add(AtomicRunnable.checkAtomic(runWhenAvailable));
+ return;
+ }
+ runWhenAvailable.run();
+ }
+
+
+ private void memoryReleased() {
+ Runnable runnable;
+
+ while ((runnable = memoryCallback.poll()) != null) {
+ runnable.run();
+ }
+ }
+
+
+
+ @Override
public boolean isGlobalFull() {
return diskFull || maxSize > 0 && globalSizeBytes.get() > maxSize;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e36e072/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index 74212ce..5f0d3c8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -23,7 +23,9 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Queue;
import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -60,6 +62,7 @@ import org.apache.activemq.artemis.core.transaction.TransactionOperation;
import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
import org.apache.activemq.artemis.utils.FutureLatch;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
+import org.apache.activemq.artemis.utils.runnables.AtomicRunnable;
import org.jboss.logging.Logger;
/**
@@ -639,7 +642,16 @@ public class PagingStoreImpl implements PagingStore {
}
- private final PagingManager.MemoryFreedRunnablesExecutor memoryFreedRunnablesExecutor = new PagingManager.MemoryFreedRunnablesExecutor();
+ private final Queue<Runnable> onMemoryFreedRunnables = new ConcurrentLinkedQueue<>();
+
+ private void memoryReleased() {
+ Runnable runnable;
+
+ while ((runnable = onMemoryFreedRunnables.poll()) != null) {
+ runnable.run();
+ }
+ }
+
@Override
public boolean checkMemory(final Runnable runWhenAvailable) {
@@ -650,9 +662,8 @@ public class PagingStoreImpl implements PagingStore {
}
} else if (pagingManager.isDiskFull() || addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK && (maxSize != -1 || usingGlobalMaxSize)) {
if (pagingManager.isDiskFull() || maxSize > 0 && sizeInBytes.get() > maxSize || pagingManager.isGlobalFull()) {
- PagingManager.OverSizedRunnable ourRunnable = new PagingManager.OverSizedRunnable(runWhenAvailable);
- memoryFreedRunnablesExecutor.addRunnable(ourRunnable);
+ onMemoryFreedRunnables.add(AtomicRunnable.checkAtomic(runWhenAvailable));
// We check again to avoid a race condition where the size can come down just after the element
// has been added, but the check to execute was done before the element was added
@@ -660,7 +671,7 @@ public class PagingStoreImpl implements PagingStore {
// MUCH better performance in a highly concurrent environment
if (!pagingManager.isGlobalFull() && (sizeInBytes.get() <= maxSize || maxSize < 0)) {
// run it now
- ourRunnable.run();
+ runWhenAvailable.run();
} else {
if (usingGlobalMaxSize || pagingManager.isDiskFull()) {
pagingManager.addBlockedStore(this);
@@ -719,8 +730,8 @@ public class PagingStoreImpl implements PagingStore {
public boolean checkReleaseMemory(boolean globalOversized, long newSize) {
if (!globalOversized && (newSize <= maxSize || maxSize < 0)) {
- if (!memoryFreedRunnablesExecutor.isEmpty()) {
- executor.execute(memoryFreedRunnablesExecutor);
+ if (!onMemoryFreedRunnables.isEmpty()) {
+ executor.execute(this::memoryReleased);
if (blocking.get()) {
ActiveMQServerLogger.LOGGER.unblockingMessageProduction(address, sizeInBytes.get(), maxSize);
blocking.set(false);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e36e072/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 96fffe5..b10d652 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1950,17 +1950,4 @@ public interface ActiveMQServerLogger extends BasicLogger {
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 224095, value = "Error updating Consumer Count: {0}", format = Message.Format.MESSAGE_FORMAT)
void consumerCountError(String reason);
-
- @LogMessage(level = Logger.Level.WARN)
- @Message(id = 224096, value = "Disk Full! Blocking message production. Clients will report blocked.", format = Message.Format.MESSAGE_FORMAT)
- void blockingGlobalDiskFull();
-
- @LogMessage(level = Logger.Level.WARN)
- @Message(id = 224097, value = "Blocking message production; size is currently: {0} bytes;", format = Message.Format.MESSAGE_FORMAT)
- void blockingGlobalMessageProduction(long globalSize);
-
- @LogMessage(level = Logger.Level.INFO)
- @Message(id = 224098, value = "Unblocking message production; size is currently: {0} bytes;", format = Message.Format.MESSAGE_FORMAT)
- void unblockingGlobalMessageProduction(long globalSize);
-
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e36e072/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java
index 957661c..ad59117 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java
@@ -150,14 +150,11 @@ public class FileStoreMonitor extends ActiveMQScheduledComponent {
public interface Callback {
- default void tick(FileStore store, double usage) {
- }
+ void tick(FileStore store, double usage);
- default void over(FileStore store, double usage) {
- }
+ void over(FileStore store, double usage);
- default void under(FileStore store, double usage) {
- }
+ void under(FileStore store, double usage);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e36e072/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java
index e4f27c3..b91d3de 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java
@@ -137,6 +137,16 @@ public class FileStoreMonitorTest extends ActiveMQTestBase {
System.out.println("TickS::" + usage);
latch.countDown();
}
+
+ @Override
+ public void over(FileStore store, double usage) {
+
+ }
+
+ @Override
+ public void under(FileStore store, double usage) {
+
+ }
});
storeMonitor.start();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e36e072/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java
index d664013..0e0f86d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java
@@ -25,6 +25,7 @@ import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Assert;
import org.junit.Test;
import java.net.URI;
@@ -45,6 +46,11 @@ public class GlobalDiskFullTest extends AmqpClientTestSupport {
FileStoreMonitor monitor = ((ActiveMQServerImpl)server).getMonitor().setMaxUsage(0.0);
final CountDownLatch latch = new CountDownLatch(1);
monitor.addCallback(new FileStoreMonitor.Callback() {
+
+ @Override
+ public void tick(FileStore store, double usage) {
+ }
+
@Override
public void over(FileStore store, double usage) {
latch.countDown();
@@ -53,7 +59,8 @@ public class GlobalDiskFullTest extends AmqpClientTestSupport {
public void under(FileStore store, double usage) {
}
});
- latch.await(2, TimeUnit.SECONDS);
+
+ Assert.assertTrue(latch.await(1, TimeUnit.MINUTES));
AmqpClient client = createAmqpClient(new URI("tcp://localhost:" + AMQP_PORT));
AmqpConnection connection = addConnection(client.connect());
@@ -61,27 +68,65 @@ public class GlobalDiskFullTest extends AmqpClientTestSupport {
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getQueueName());
- final AmqpMessage message = new AmqpMessage();
byte[] payload = new byte[1000];
- message.setBytes(payload);
-
- sender.setSendTimeout(1000);
- sender.send(message);
- org.apache.activemq.artemis.core.server.Queue queueView = getProxyToQueue(getQueueName());
- assertEquals("shouldn't receive any messages", 0, queueView.getMessageCount());
AmqpSender anonSender = session.createSender();
- final AmqpMessage message1 = new AmqpMessage();
- message1.setBytes(payload);
- message1.setAddress(getQueueName());
- anonSender.setSendTimeout(1000);
- anonSender.send(message1);
+ CountDownLatch sentWithName = new CountDownLatch(1);
+ CountDownLatch sentAnon = new CountDownLatch(1);
+
+ Thread threadWithName = new Thread() {
+ @Override
+ public void run() {
+
+ try {
+ final AmqpMessage message = new AmqpMessage();
+ message.setBytes(payload);
+ sender.setSendTimeout(-1);
+ sender.send(message);
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ sentWithName.countDown();
+ }
+ }
+ };
+
+ threadWithName.start();
+
+
+ Thread threadWithAnon = new Thread() {
+ @Override
+ public void run() {
+ try {
+ final AmqpMessage message = new AmqpMessage();
+ message.setBytes(payload);
+ anonSender.setSendTimeout(-1);
+ message.setAddress(getQueueName());
+ anonSender.send(message);
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ sentAnon.countDown();
+ }
+ }
+ };
+
+ threadWithAnon.start();
+
+ Assert.assertFalse("Thread sender should be blocked", sentWithName.await(500, TimeUnit.MILLISECONDS));
+ Assert.assertFalse("Thread sender anonymous should be blocked", sentAnon.await(500, TimeUnit.MILLISECONDS));
+
+ monitor.setMaxUsage(100.0);
- queueView = getProxyToQueue(getQueueName());
- assertEquals("shouldn't receive any messages", 0, queueView.getMessageCount());
+ Assert.assertTrue("Thread sender should be released", sentWithName.await(30, TimeUnit.SECONDS));
+ Assert.assertTrue("Thread sender anonymous should be released", sentAnon.await(30, TimeUnit.SECONDS));
+ threadWithName.join(TimeUnit.SECONDS.toMillis(30));
+ threadWithAnon.join(TimeUnit.SECONDS.toMillis(30));
+ Assert.assertFalse(threadWithName.isAlive());
+ Assert.assertFalse(threadWithAnon.isAlive());
} finally {
connection.close();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e36e072/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java
index 3431655..94a9d79 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java
@@ -30,7 +30,12 @@ import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
public final class FakePagingManager implements PagingManager {
@Override
- public void addBlockedStore(Blockable store) {
+ public void addBlockedStore(PagingStore store) {
+
+ }
+
+ @Override
+ public void checkMemory(Runnable runWhenAvailable) {
}
@@ -115,11 +120,6 @@ public final class FakePagingManager implements PagingManager {
return false;
}
- @Override
- public boolean checkMemory(Runnable runnable) {
- return false;
- }
-
/*
* (non-Javadoc)
* @see org.apache.activemq.artemis.core.paging.PagingManager#isGlobalFull()