You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/05/10 19:13:48 UTC
svn commit: r1481106 - in /activemq/trunk:
activemq-broker/src/main/java/org/apache/activemq/usage/
activemq-client/src/main/java/org/apache/activemq/usage/
activemq-client/src/test/java/org/apache/activemq/usage/
Author: tabish
Date: Fri May 10 17:13:48 2013
New Revision: 1481106
URL: http://svn.apache.org/r1481106
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-4512
Usage and MemoryUsage sync fixes to keep state consistent during thread contention. Test case added.
Added:
activemq/trunk/activemq-client/src/test/java/org/apache/activemq/usage/MemoryUsageConcurrencyTest.java (with props)
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/usage/StoreUsage.java
activemq/trunk/activemq-client/src/main/java/org/apache/activemq/usage/MemoryUsage.java
activemq/trunk/activemq-client/src/main/java/org/apache/activemq/usage/Usage.java
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/usage/StoreUsage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/usage/StoreUsage.java?rev=1481106&r1=1481105&r2=1481106&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/usage/StoreUsage.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/usage/StoreUsage.java Fri May 10 17:13:48 2013
@@ -22,9 +22,9 @@ import org.apache.activemq.store.Persist
* Used to keep track of how much of something is being used so that a
* productive working set usage can be controlled. Main use case is manage
* memory usage.
- *
+ *
* @org.apache.xbean.XBean
- *
+ *
*/
public class StoreUsage extends Usage<StoreUsage> {
@@ -44,6 +44,7 @@ public class StoreUsage extends Usage<St
this.store = parent.store;
}
+ @Override
protected long retrieveUsage() {
if (store == null)
return 0;
@@ -61,9 +62,12 @@ public class StoreUsage extends Usage<St
@Override
public int getPercentUsage() {
- synchronized (usageMutex) {
+ usageLock.writeLock().lock();
+ try {
percentUsage = caclPercentUsage();
return super.getPercentUsage();
+ } finally {
+ usageLock.writeLock().unlock();
}
}
Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/usage/MemoryUsage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/usage/MemoryUsage.java?rev=1481106&r1=1481105&r2=1481106&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/usage/MemoryUsage.java (original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/usage/MemoryUsage.java Fri May 10 17:13:48 2013
@@ -16,6 +16,8 @@
*/
package org.apache.activemq.usage;
+import java.util.concurrent.TimeUnit;
+
/**
* Used to keep track of how much of something is being used so that a
* productive working set usage can be controlled. Main use case is manage
@@ -63,14 +65,26 @@ public class MemoryUsage extends Usage<M
if (parent != null) {
parent.waitForSpace();
}
- synchronized (usageMutex) {
- while (percentUsage >= 100 && isStarted()) {
- usageMutex.wait();
+ usageLock.readLock().lock();
+ try {
+ if (percentUsage >= 100 && isStarted()) {
+ usageLock.readLock().unlock();
+ usageLock.writeLock().lock();
+ try {
+ while (percentUsage >= 100 && isStarted()) {
+ waitForSpaceCondition.await();
+ }
+ usageLock.readLock().lock();
+ } finally {
+ usageLock.writeLock().unlock();
+ }
}
if (percentUsage >= 100 && !isStarted()) {
throw new InterruptedException("waitForSpace stopped during wait.");
}
+ } finally {
+ usageLock.readLock().unlock();
}
}
@@ -86,11 +100,24 @@ public class MemoryUsage extends Usage<M
return false;
}
}
- synchronized (usageMutex) {
+ usageLock.readLock().lock();
+ try {
if (percentUsage >= 100) {
- usageMutex.wait(timeout);
+ usageLock.readLock().unlock();
+ usageLock.writeLock().lock();
+ try {
+ while (percentUsage >= 100 ) {
+ waitForSpaceCondition.await(timeout, TimeUnit.MILLISECONDS);
+ }
+ usageLock.readLock().lock();
+ } finally {
+ usageLock.writeLock().unlock();
+ }
}
+
return percentUsage < 100;
+ } finally {
+ usageLock.readLock().unlock();
}
}
@@ -99,8 +126,11 @@ public class MemoryUsage extends Usage<M
if (parent != null && parent.isFull()) {
return true;
}
- synchronized (usageMutex) {
+ usageLock.readLock().lock();
+ try {
return percentUsage >= 100;
+ } finally {
+ usageLock.readLock().unlock();
}
}
@@ -125,12 +155,15 @@ public class MemoryUsage extends Usage<M
if (value == 0) {
return;
}
- int percentUsage;
- synchronized (usageMutex) {
+
+ usageLock.writeLock().lock();
+ try {
usage += value;
- percentUsage = caclPercentUsage();
+ setPercentUsage(caclPercentUsage());
+ } finally {
+ usageLock.writeLock().unlock();
}
- setPercentUsage(percentUsage);
+
if (parent != null) {
parent.increaseUsage(value);
}
@@ -145,12 +178,15 @@ public class MemoryUsage extends Usage<M
if (value == 0) {
return;
}
- int percentUsage;
- synchronized (usageMutex) {
+
+ usageLock.writeLock().lock();
+ try {
usage -= value;
- percentUsage = caclPercentUsage();
+ setPercentUsage(caclPercentUsage());
+ } finally {
+ usageLock.writeLock().unlock();
}
- setPercentUsage(percentUsage);
+
if (parent != null) {
parent.decreaseUsage(value);
}
Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/usage/Usage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/usage/Usage.java?rev=1481106&r1=1481105&r2=1481106&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/usage/Usage.java (original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/usage/Usage.java Fri May 10 17:13:48 2013
@@ -16,21 +16,22 @@
*/
package org.apache.activemq.usage;
-import java.util.ArrayList;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
import org.apache.activemq.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Used to keep track of how much of something is being used so that a
- * productive working set usage can be controlled. Main use case is manage
- * memory usage.
+ * Used to keep track of how much of something is being used so that a productive working set usage can be controlled.
+ * Main use case is manage memory usage.
*
* @org.apache.xbean.XBean
*
@@ -38,25 +39,29 @@ import org.slf4j.LoggerFactory;
public abstract class Usage<T extends Usage> implements Service {
private static final Logger LOG = LoggerFactory.getLogger(Usage.class);
- protected final Object usageMutex = new Object();
+
+ protected final ReentrantReadWriteLock usageLock = new ReentrantReadWriteLock();
+ protected final Condition waitForSpaceCondition = usageLock.writeLock().newCondition();
protected int percentUsage;
protected T parent;
+ protected String name;
+
private UsageCapacity limiter = new DefaultUsageCapacity();
private int percentUsageMinDelta = 1;
private final List<UsageListener> listeners = new CopyOnWriteArrayList<UsageListener>();
private final boolean debug = LOG.isDebugEnabled();
- protected String name;
private float usagePortion = 1.0f;
private final List<T> children = new CopyOnWriteArrayList<T>();
private final List<Runnable> callbacks = new LinkedList<Runnable>();
private int pollingTime = 100;
- private final AtomicBoolean started=new AtomicBoolean();
+ private final AtomicBoolean started = new AtomicBoolean();
private ThreadPoolExecutor executor;
+
public Usage(T parent, String name, float portion) {
this.parent = parent;
this.usagePortion = portion;
if (parent != null) {
- this.limiter.setLimit((long)(parent.getLimit() * portion));
+ this.limiter.setLimit((long) (parent.getLimit() * portion));
name = parent.name + ":" + name;
}
this.name = name;
@@ -86,15 +91,16 @@ public abstract class Usage<T extends Us
return false;
}
}
- synchronized (usageMutex) {
- percentUsage=caclPercentUsage();
+ usageLock.writeLock().lock();
+ try {
+ percentUsage = caclPercentUsage();
if (percentUsage >= highWaterMark) {
long deadline = timeout > 0 ? System.currentTimeMillis() + timeout : Long.MAX_VALUE;
long timeleft = deadline;
while (timeleft > 0) {
- percentUsage=caclPercentUsage();
+ percentUsage = caclPercentUsage();
if (percentUsage >= highWaterMark) {
- usageMutex.wait(pollingTime);
+ waitForSpaceCondition.await(pollingTime, TimeUnit.MILLISECONDS);
timeleft = deadline - System.currentTimeMillis();
} else {
break;
@@ -102,6 +108,8 @@ public abstract class Usage<T extends Us
}
}
return percentUsage < highWaterMark;
+ } finally {
+ usageLock.writeLock().unlock();
}
}
@@ -113,9 +121,12 @@ public abstract class Usage<T extends Us
if (parent != null && parent.isFull(highWaterMark)) {
return true;
}
- synchronized (usageMutex) {
- percentUsage=caclPercentUsage();
+ usageLock.writeLock().lock();
+ try {
+ percentUsage = caclPercentUsage();
return percentUsage >= highWaterMark;
+ } finally {
+ usageLock.writeLock().unlock();
}
}
@@ -128,16 +139,18 @@ public abstract class Usage<T extends Us
}
public long getLimit() {
- synchronized (usageMutex) {
+ usageLock.readLock().lock();
+ try {
return limiter.getLimit();
+ } finally {
+ usageLock.readLock().unlock();
}
}
/**
- * Sets the memory limit in bytes. Setting the limit in bytes will set the
- * usagePortion to 0 since the UsageManager is not going to be portion based
- * off the parent.
- * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
+ * Sets the memory limit in bytes. Setting the limit in bytes will set the usagePortion to 0 since the UsageManager
+ * is not going to be portion based off the parent. When set using Xbean, values of the form "20 Mb", "1024kb", and
+ * "1g" can be used
*
* @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
*/
@@ -145,9 +158,12 @@ public abstract class Usage<T extends Us
if (percentUsageMinDelta < 0) {
throw new IllegalArgumentException("percentUsageMinDelta must be greater or equal to 0");
}
- synchronized (usageMutex) {
+ usageLock.writeLock().lock();
+ try {
this.limiter.setLimit(limit);
this.usagePortion = 0;
+ } finally {
+ usageLock.writeLock().unlock();
}
onLimitChange();
}
@@ -155,52 +171,67 @@ public abstract class Usage<T extends Us
protected void onLimitChange() {
// We may need to calculate the limit
if (usagePortion > 0 && parent != null) {
- synchronized (usageMutex) {
- this.limiter.setLimit((long)(parent.getLimit() * usagePortion));
+ usageLock.writeLock().lock();
+ try {
+ this.limiter.setLimit((long) (parent.getLimit() * usagePortion));
+ } finally {
+ usageLock.writeLock().unlock();
}
}
// Reset the percent currently being used.
- int percentUsage;
- synchronized (usageMutex) {
- percentUsage = caclPercentUsage();
+ usageLock.writeLock().lock();
+ try {
+ setPercentUsage(caclPercentUsage());
+ } finally {
+ usageLock.writeLock().unlock();
}
- setPercentUsage(percentUsage);
// Let the children know that the limit has changed. They may need to
- // set
- // their limits based on ours.
+ // set their limits based on ours.
for (T child : children) {
child.onLimitChange();
}
}
public float getUsagePortion() {
- synchronized (usageMutex) {
+ usageLock.readLock().lock();
+ try {
return usagePortion;
+ } finally {
+ usageLock.readLock().unlock();
}
}
public void setUsagePortion(float usagePortion) {
- synchronized (usageMutex) {
+ usageLock.writeLock().lock();
+ try {
this.usagePortion = usagePortion;
+ } finally {
+ usageLock.writeLock().unlock();
}
onLimitChange();
}
public int getPercentUsage() {
- synchronized (usageMutex) {
+ usageLock.readLock().lock();
+ try {
return percentUsage;
+ } finally {
+ usageLock.readLock().unlock();
}
}
public int getPercentUsageMinDelta() {
- synchronized (usageMutex) {
+ usageLock.readLock().lock();
+ try {
return percentUsageMinDelta;
+ } finally {
+ usageLock.readLock().unlock();
}
}
/**
- * Sets the minimum number of percentage points the usage has to change
- * before a UsageListener event is fired by the manager.
+ * Sets the minimum number of percentage points the usage has to change before a UsageListener event is fired by the
+ * manager.
*
* @param percentUsageMinDelta
* @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
@@ -209,27 +240,35 @@ public abstract class Usage<T extends Us
if (percentUsageMinDelta < 1) {
throw new IllegalArgumentException("percentUsageMinDelta must be greater than 0");
}
- int percentUsage;
- synchronized (usageMutex) {
+
+ usageLock.writeLock().lock();
+ try {
this.percentUsageMinDelta = percentUsageMinDelta;
- percentUsage = caclPercentUsage();
+ setPercentUsage(caclPercentUsage());
+ } finally {
+ usageLock.writeLock().unlock();
}
- setPercentUsage(percentUsage);
}
public long getUsage() {
- synchronized (usageMutex) {
+ usageLock.readLock().lock();
+ try {
return retrieveUsage();
+ } finally {
+ usageLock.readLock().unlock();
}
}
protected void setPercentUsage(int value) {
- synchronized (usageMutex) {
+ usageLock.writeLock().lock();
+ try {
int oldValue = percentUsage;
percentUsage = value;
if (oldValue != value) {
fireEvent(oldValue, value);
}
+ } finally {
+ usageLock.writeLock().unlock();
}
}
@@ -237,26 +276,23 @@ public abstract class Usage<T extends Us
if (limiter.getLimit() == 0) {
return 0;
}
- return (int)((((retrieveUsage() * 100) / limiter.getLimit()) / percentUsageMinDelta) * percentUsageMinDelta);
+ return (int) ((((retrieveUsage() * 100) / limiter.getLimit()) / percentUsageMinDelta) * percentUsageMinDelta);
}
+ // Must be called with the usage lock's writeLock held.
private void fireEvent(final int oldPercentUsage, final int newPercentUsage) {
if (debug) {
- LOG.debug(getName() + ": usage change from: " + oldPercentUsage + "% of available memory, to: "
- + newPercentUsage + "% of available memory");
+ LOG.debug(getName() + ": usage change from: " + oldPercentUsage + "% of available memory, to: " + newPercentUsage + "% of available memory");
}
if (started.get()) {
// Switching from being full to not being full..
if (oldPercentUsage >= 100 && newPercentUsage < 100) {
- synchronized (usageMutex) {
- usageMutex.notifyAll();
- if (!callbacks.isEmpty()) {
- for (Iterator<Runnable> iter = new ArrayList<Runnable>(callbacks).iterator(); iter.hasNext();) {
- Runnable callback = iter.next();
- getExecutor().execute(callback);
- }
- callbacks.clear();
+ waitForSpaceCondition.signalAll();
+ if (!callbacks.isEmpty()) {
+ for (Runnable callback : callbacks) {
+ getExecutor().execute(callback);
}
+ callbacks.clear();
}
}
if (!listeners.isEmpty()) {
@@ -264,9 +300,8 @@ public abstract class Usage<T extends Us
Runnable listenerNotifier = new Runnable() {
@Override
public void run() {
- for (Iterator<UsageListener> iter = listeners.iterator(); iter.hasNext();) {
- UsageListener l = iter.next();
- l.onUsageChanged(Usage.this, oldPercentUsage, newPercentUsage);
+ for (UsageListener listener : listeners) {
+ listener.onUsageChanged(Usage.this, oldPercentUsage, newPercentUsage);
}
}
};
@@ -285,24 +320,21 @@ public abstract class Usage<T extends Us
@Override
public String toString() {
- return "Usage(" + getName() + ") percentUsage=" + percentUsage
- + "%, usage=" + retrieveUsage() + ", limit=" + limiter.getLimit()
- + ", percentUsageMinDelta=" + percentUsageMinDelta + "%"
- + (parent != null ? ";Parent:" + parent.toString() : "");
+ return "Usage(" + getName() + ") percentUsage=" + percentUsage + "%, usage=" + retrieveUsage() + ", limit=" + limiter.getLimit()
+ + ", percentUsageMinDelta=" + percentUsageMinDelta + "%" + (parent != null ? ";Parent:" + parent.toString() : "");
}
@Override
@SuppressWarnings("unchecked")
public void start() {
- if (started.compareAndSet(false, true)){
+ if (started.compareAndSet(false, true)) {
if (parent != null) {
parent.addChild(this);
- if(getLimit() > parent.getLimit()) {
- LOG.info("Usage({}) limit={} should be smaller than its parent limit={}",
- new Object[]{getName(), getLimit(), parent.getLimit()});
+ if (getLimit() > parent.getLimit()) {
+ LOG.info("Usage({}) limit={} should be smaller than its parent limit={}", new Object[] { getName(), getLimit(), parent.getLimit() });
}
}
- for (T t:children) {
+ for (T t : children) {
t.start();
}
}
@@ -311,21 +343,24 @@ public abstract class Usage<T extends Us
@Override
@SuppressWarnings("unchecked")
public void stop() {
- if (started.compareAndSet(true, false)){
+ if (started.compareAndSet(true, false)) {
if (parent != null) {
parent.removeChild(this);
}
- //clear down any callbacks
- synchronized (usageMutex) {
- usageMutex.notifyAll();
- for (Iterator<Runnable> iter = new ArrayList<Runnable>(this.callbacks).iterator(); iter.hasNext();) {
- Runnable callback = iter.next();
+ // clear down any callbacks
+ usageLock.writeLock().lock();
+ try {
+ waitForSpaceCondition.signalAll();
+ for (Runnable callback : this.callbacks) {
callback.run();
}
this.callbacks.clear();
+ } finally {
+ usageLock.writeLock().unlock();
}
- for (T t:children) {
+
+ for (T t : children) {
t.stop();
}
}
@@ -344,8 +379,7 @@ public abstract class Usage<T extends Us
/**
* @param callback
- * @return true if the UsageManager was full. The callback will only be
- * called if this method returns true.
+ * @return true if the UsageManager was full. The callback will only be called if this method returns true.
*/
public boolean notifyCallbackWhenNotFull(final Runnable callback) {
if (parent != null) {
@@ -353,12 +387,15 @@ public abstract class Usage<T extends Us
@Override
public void run() {
- synchronized (usageMutex) {
+ usageLock.writeLock().lock();
+ try {
if (percentUsage >= 100) {
callbacks.add(callback);
} else {
callback.run();
}
+ } finally {
+ usageLock.writeLock().unlock();
}
}
};
@@ -366,13 +403,16 @@ public abstract class Usage<T extends Us
return true;
}
}
- synchronized (usageMutex) {
+ usageLock.writeLock().lock();
+ try {
if (percentUsage >= 100) {
callbacks.add(callback);
return true;
} else {
return false;
}
+ } finally {
+ usageLock.writeLock().unlock();
}
}
@@ -384,7 +424,8 @@ public abstract class Usage<T extends Us
}
/**
- * @param limiter the limiter to set
+ * @param limiter
+ * the limiter to set
*/
public void setLimiter(UsageCapacity limiter) {
this.limiter = limiter;
@@ -398,7 +439,8 @@ public abstract class Usage<T extends Us
}
/**
- * @param pollingTime the pollingTime to set
+ * @param pollingTime
+ * the pollingTime to set
*/
public void setPollingTime(int pollingTime) {
this.pollingTime = pollingTime;
@@ -416,7 +458,7 @@ public abstract class Usage<T extends Us
this.parent = parent;
}
- public void setExecutor (ThreadPoolExecutor executor) {
+ public void setExecutor(ThreadPoolExecutor executor) {
this.executor = executor;
}
Added: activemq/trunk/activemq-client/src/test/java/org/apache/activemq/usage/MemoryUsageConcurrencyTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/test/java/org/apache/activemq/usage/MemoryUsageConcurrencyTest.java?rev=1481106&view=auto
==============================================================================
--- activemq/trunk/activemq-client/src/test/java/org/apache/activemq/usage/MemoryUsageConcurrencyTest.java (added)
+++ activemq/trunk/activemq-client/src/test/java/org/apache/activemq/usage/MemoryUsageConcurrencyTest.java Fri May 10 17:13:48 2013
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usage;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.Test;
+
+public class MemoryUsageConcurrencyTest {
+
+ @Test
+ public void testCycle() throws Exception {
+ Random r = new Random(0xb4a14);
+ for (int i = 0; i < 50000; i++) {
+ checkPercentage(i, i, r.nextInt(100) + 10, i % 2 == 0, i % 5 == 0);
+ }
+ }
+
+ private void checkPercentage(int attempt, int seed, int operations, boolean useArrayBlocking, boolean useWaitForSpaceThread) throws InterruptedException {
+
+ final BlockingQueue<Integer> toAdd;
+ final BlockingQueue<Integer> toRemove;
+ final BlockingQueue<Integer> removed;
+
+ if (useArrayBlocking) {
+ toAdd = new ArrayBlockingQueue<Integer>(operations);
+ toRemove = new ArrayBlockingQueue<Integer>(operations);
+ removed = new ArrayBlockingQueue<Integer>(operations);
+ } else {
+ toAdd = new LinkedBlockingQueue<Integer>();
+ toRemove = new LinkedBlockingQueue<Integer>();
+ removed = new LinkedBlockingQueue<Integer>();
+ }
+
+ final AtomicBoolean running = new AtomicBoolean(true);
+ final CountDownLatch startLatch = new CountDownLatch(1);
+
+ final MemoryUsage memUsage = new MemoryUsage();
+ memUsage.setLimit(1000);
+ memUsage.start();
+
+ Thread addThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ startLatch.await();
+
+ while (true) {
+ Integer add = toAdd.poll(1, TimeUnit.MILLISECONDS);
+ if (add == null) {
+ if (!running.get()) {
+ break;
+ }
+ } else {
+ // add to other queue before removing
+ toRemove.add(add);
+ memUsage.increaseUsage(add);
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+
+ Thread removeThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ startLatch.await();
+
+ while (true) {
+ Integer remove = toRemove.poll(1, TimeUnit.MILLISECONDS);
+ if (remove == null) {
+ if (!running.get()) {
+ break;
+ }
+ } else {
+ memUsage.decreaseUsage(remove);
+ removed.add(remove);
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+
+ Thread waitForSpaceThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ startLatch.await();
+
+ while (running.get()) {
+ memUsage.waitForSpace();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+
+ removeThread.start();
+ addThread.start();
+ if (useWaitForSpaceThread) {
+ waitForSpaceThread.start();
+ }
+
+ Random r = new Random(seed);
+
+ startLatch.countDown();
+
+ for (int i = 0; i < operations; i++) {
+ toAdd.add(r.nextInt(100) + 1);
+ }
+
+ // we expect the failure percentage to be related to the last operation
+ List<Integer> ops = new ArrayList<Integer>(operations);
+ for (int i = 0; i < operations; i++) {
+ Integer op = removed.poll(1000, TimeUnit.MILLISECONDS);
+ assertNotNull(op);
+ ops.add(op);
+ }
+
+ running.set(false);
+
+ if (useWaitForSpaceThread) {
+ try {
+ waitForSpaceThread.join(1000);
+ } catch (InterruptedException e) {
+ System.out.println("Attempt: " + attempt + " : " + memUsage + " waitForSpace never returned");
+ waitForSpaceThread.interrupt();
+ waitForSpaceThread.join();
+ }
+ }
+
+ removeThread.join();
+ addThread.join();
+
+ if (memUsage.getPercentUsage() != 0 || memUsage.getUsage() != memUsage.getPercentUsage()) {
+ System.out.println("Attempt: " + attempt + " : " + memUsage);
+ System.out.println("Operations: " + ops);
+ assertEquals(0, memUsage.getPercentUsage());
+ }
+ }
+}
Propchange: activemq/trunk/activemq-client/src/test/java/org/apache/activemq/usage/MemoryUsageConcurrencyTest.java
------------------------------------------------------------------------------
svn:eol-style = native