You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/05/10 19:13:48 UTC

svn commit: r1481106 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/usage/ activemq-client/src/main/java/org/apache/activemq/usage/ activemq-client/src/test/java/org/apache/activemq/usage/

Author: tabish
Date: Fri May 10 17:13:48 2013
New Revision: 1481106

URL: http://svn.apache.org/r1481106
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-4512

Usage and MemoryUsage sync fixes to keep state consistent during thread contention.  Test case added.

Added:
    activemq/trunk/activemq-client/src/test/java/org/apache/activemq/usage/MemoryUsageConcurrencyTest.java   (with props)
Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/usage/StoreUsage.java
    activemq/trunk/activemq-client/src/main/java/org/apache/activemq/usage/MemoryUsage.java
    activemq/trunk/activemq-client/src/main/java/org/apache/activemq/usage/Usage.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/usage/StoreUsage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/usage/StoreUsage.java?rev=1481106&r1=1481105&r2=1481106&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/usage/StoreUsage.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/usage/StoreUsage.java Fri May 10 17:13:48 2013
@@ -22,9 +22,9 @@ import org.apache.activemq.store.Persist
  * Used to keep track of how much of something is being used so that a
  * productive working set usage can be controlled. Main use case is manage
  * memory usage.
- * 
+ *
  * @org.apache.xbean.XBean
- * 
+ *
  */
 public class StoreUsage extends Usage<StoreUsage> {
 
@@ -44,6 +44,7 @@ public class StoreUsage extends Usage<St
         this.store = parent.store;
     }
 
+    @Override
     protected long retrieveUsage() {
         if (store == null)
             return 0;
@@ -61,9 +62,12 @@ public class StoreUsage extends Usage<St
 
     @Override
     public int getPercentUsage() {
-        synchronized (usageMutex) {
+        usageLock.writeLock().lock();
+        try {
             percentUsage = caclPercentUsage();
             return super.getPercentUsage();
+        } finally {
+            usageLock.writeLock().unlock();
         }
     }
 

Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/usage/MemoryUsage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/usage/MemoryUsage.java?rev=1481106&r1=1481105&r2=1481106&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/usage/MemoryUsage.java (original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/usage/MemoryUsage.java Fri May 10 17:13:48 2013
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.usage;
 
+import java.util.concurrent.TimeUnit;
+
 /**
  * Used to keep track of how much of something is being used so that a
  * productive working set usage can be controlled. Main use case is manage
@@ -63,14 +65,26 @@ public class MemoryUsage extends Usage<M
         if (parent != null) {
             parent.waitForSpace();
         }
-        synchronized (usageMutex) {
-            while (percentUsage >= 100 && isStarted()) {
-                usageMutex.wait();
+        usageLock.readLock().lock();
+        try {
+            if (percentUsage >= 100 && isStarted()) {
+                usageLock.readLock().unlock();
+                usageLock.writeLock().lock();
+                try {
+                    while (percentUsage >= 100 && isStarted()) {
+                        waitForSpaceCondition.await();
+                    }
+                    usageLock.readLock().lock();
+                } finally {
+                    usageLock.writeLock().unlock();
+                }
             }
 
             if (percentUsage >= 100 && !isStarted()) {
                 throw new InterruptedException("waitForSpace stopped during wait.");
             }
+        } finally {
+            usageLock.readLock().unlock();
         }
     }
 
@@ -86,11 +100,24 @@ public class MemoryUsage extends Usage<M
                 return false;
             }
         }
-        synchronized (usageMutex) {
+        usageLock.readLock().lock();
+        try {
             if (percentUsage >= 100) {
-                usageMutex.wait(timeout);
+                usageLock.readLock().unlock();
+                usageLock.writeLock().lock();
+                try {
+                    while (percentUsage >= 100 ) {
+                        waitForSpaceCondition.await(timeout, TimeUnit.MILLISECONDS);
+                    }
+                    usageLock.readLock().lock();
+                } finally {
+                    usageLock.writeLock().unlock();
+                }
             }
+
             return percentUsage < 100;
+        } finally {
+            usageLock.readLock().unlock();
         }
     }
 
@@ -99,8 +126,11 @@ public class MemoryUsage extends Usage<M
         if (parent != null && parent.isFull()) {
             return true;
         }
-        synchronized (usageMutex) {
+        usageLock.readLock().lock();
+        try {
             return percentUsage >= 100;
+        } finally {
+            usageLock.readLock().unlock();
         }
     }
 
@@ -125,12 +155,15 @@ public class MemoryUsage extends Usage<M
         if (value == 0) {
             return;
         }
-        int percentUsage;
-        synchronized (usageMutex) {
+
+        usageLock.writeLock().lock();
+        try {
             usage += value;
-            percentUsage = caclPercentUsage();
+            setPercentUsage(caclPercentUsage());
+        } finally {
+            usageLock.writeLock().unlock();
         }
-        setPercentUsage(percentUsage);
+
         if (parent != null) {
             parent.increaseUsage(value);
         }
@@ -145,12 +178,15 @@ public class MemoryUsage extends Usage<M
         if (value == 0) {
             return;
         }
-        int percentUsage;
-        synchronized (usageMutex) {
+
+        usageLock.writeLock().lock();
+        try {
             usage -= value;
-            percentUsage = caclPercentUsage();
+            setPercentUsage(caclPercentUsage());
+        } finally {
+            usageLock.writeLock().unlock();
         }
-        setPercentUsage(percentUsage);
+
         if (parent != null) {
             parent.decreaseUsage(value);
         }

Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/usage/Usage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/usage/Usage.java?rev=1481106&r1=1481105&r2=1481106&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/usage/Usage.java (original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/usage/Usage.java Fri May 10 17:13:48 2013
@@ -16,21 +16,22 @@
  */
 package org.apache.activemq.usage;
 
-import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 import org.apache.activemq.Service;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Used to keep track of how much of something is being used so that a
- * productive working set usage can be controlled. Main use case is manage
- * memory usage.
+ * Used to keep track of how much of something is being used so that a productive working set usage can be controlled.
+ * Main use case is manage memory usage.
  *
  * @org.apache.xbean.XBean
  *
@@ -38,25 +39,29 @@ import org.slf4j.LoggerFactory;
 public abstract class Usage<T extends Usage> implements Service {
 
     private static final Logger LOG = LoggerFactory.getLogger(Usage.class);
-    protected final Object usageMutex = new Object();
+
+    protected final ReentrantReadWriteLock usageLock = new ReentrantReadWriteLock();
+    protected final Condition waitForSpaceCondition = usageLock.writeLock().newCondition();
     protected int percentUsage;
     protected T parent;
+    protected String name;
+
     private UsageCapacity limiter = new DefaultUsageCapacity();
     private int percentUsageMinDelta = 1;
     private final List<UsageListener> listeners = new CopyOnWriteArrayList<UsageListener>();
     private final boolean debug = LOG.isDebugEnabled();
-    protected String name;
     private float usagePortion = 1.0f;
     private final List<T> children = new CopyOnWriteArrayList<T>();
     private final List<Runnable> callbacks = new LinkedList<Runnable>();
     private int pollingTime = 100;
-    private final AtomicBoolean started=new AtomicBoolean();
+    private final AtomicBoolean started = new AtomicBoolean();
     private ThreadPoolExecutor executor;
+
     public Usage(T parent, String name, float portion) {
         this.parent = parent;
         this.usagePortion = portion;
         if (parent != null) {
-            this.limiter.setLimit((long)(parent.getLimit() * portion));
+            this.limiter.setLimit((long) (parent.getLimit() * portion));
             name = parent.name + ":" + name;
         }
         this.name = name;
@@ -86,15 +91,16 @@ public abstract class Usage<T extends Us
                 return false;
             }
         }
-        synchronized (usageMutex) {
-            percentUsage=caclPercentUsage();
+        usageLock.writeLock().lock();
+        try {
+            percentUsage = caclPercentUsage();
             if (percentUsage >= highWaterMark) {
                 long deadline = timeout > 0 ? System.currentTimeMillis() + timeout : Long.MAX_VALUE;
                 long timeleft = deadline;
                 while (timeleft > 0) {
-                    percentUsage=caclPercentUsage();
+                    percentUsage = caclPercentUsage();
                     if (percentUsage >= highWaterMark) {
-                        usageMutex.wait(pollingTime);
+                        waitForSpaceCondition.await(pollingTime, TimeUnit.MILLISECONDS);
                         timeleft = deadline - System.currentTimeMillis();
                     } else {
                         break;
@@ -102,6 +108,8 @@ public abstract class Usage<T extends Us
                 }
             }
             return percentUsage < highWaterMark;
+        } finally {
+            usageLock.writeLock().unlock();
         }
     }
 
@@ -113,9 +121,12 @@ public abstract class Usage<T extends Us
         if (parent != null && parent.isFull(highWaterMark)) {
             return true;
         }
-        synchronized (usageMutex) {
-            percentUsage=caclPercentUsage();
+        usageLock.writeLock().lock();
+        try {
+            percentUsage = caclPercentUsage();
             return percentUsage >= highWaterMark;
+        } finally {
+            usageLock.writeLock().unlock();
         }
     }
 
@@ -128,16 +139,18 @@ public abstract class Usage<T extends Us
     }
 
     public long getLimit() {
-        synchronized (usageMutex) {
+        usageLock.readLock().lock();
+        try {
             return limiter.getLimit();
+        } finally {
+            usageLock.readLock().unlock();
         }
     }
 
     /**
-     * Sets the memory limit in bytes. Setting the limit in bytes will set the
-     * usagePortion to 0 since the UsageManager is not going to be portion based
-     * off the parent.
-     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
+     * Sets the memory limit in bytes. Setting the limit in bytes will set the usagePortion to 0 since the UsageManager
+     * is not going to be portion based off the parent. When set using Xbean, values of the form "20 Mb", "1024kb", and
+     * "1g" can be used
      *
      * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
      */
@@ -145,9 +158,12 @@ public abstract class Usage<T extends Us
         if (percentUsageMinDelta < 0) {
             throw new IllegalArgumentException("percentUsageMinDelta must be greater or equal to 0");
         }
-        synchronized (usageMutex) {
+        usageLock.writeLock().lock();
+        try {
             this.limiter.setLimit(limit);
             this.usagePortion = 0;
+        } finally {
+            usageLock.writeLock().unlock();
         }
         onLimitChange();
     }
@@ -155,52 +171,67 @@ public abstract class Usage<T extends Us
     protected void onLimitChange() {
         // We may need to calculate the limit
         if (usagePortion > 0 && parent != null) {
-            synchronized (usageMutex) {
-                this.limiter.setLimit((long)(parent.getLimit() * usagePortion));
+            usageLock.writeLock().lock();
+            try {
+                this.limiter.setLimit((long) (parent.getLimit() * usagePortion));
+            } finally {
+                usageLock.writeLock().unlock();
             }
         }
         // Reset the percent currently being used.
-        int percentUsage;
-        synchronized (usageMutex) {
-            percentUsage = caclPercentUsage();
+        usageLock.writeLock().lock();
+        try {
+            setPercentUsage(caclPercentUsage());
+        } finally {
+            usageLock.writeLock().unlock();
         }
-        setPercentUsage(percentUsage);
         // Let the children know that the limit has changed. They may need to
-        // set
-        // their limits based on ours.
+        // set their limits based on ours.
         for (T child : children) {
             child.onLimitChange();
         }
     }
 
     public float getUsagePortion() {
-        synchronized (usageMutex) {
+        usageLock.readLock().lock();
+        try {
             return usagePortion;
+        } finally {
+            usageLock.readLock().unlock();
         }
     }
 
     public void setUsagePortion(float usagePortion) {
-        synchronized (usageMutex) {
+        usageLock.writeLock().lock();
+        try {
             this.usagePortion = usagePortion;
+        } finally {
+            usageLock.writeLock().unlock();
         }
         onLimitChange();
     }
 
     public int getPercentUsage() {
-        synchronized (usageMutex) {
+        usageLock.readLock().lock();
+        try {
             return percentUsage;
+        } finally {
+            usageLock.readLock().unlock();
         }
     }
 
     public int getPercentUsageMinDelta() {
-        synchronized (usageMutex) {
+        usageLock.readLock().lock();
+        try {
             return percentUsageMinDelta;
+        } finally {
+            usageLock.readLock().unlock();
         }
     }
 
     /**
-     * Sets the minimum number of percentage points the usage has to change
-     * before a UsageListener event is fired by the manager.
+     * Sets the minimum number of percentage points the usage has to change before a UsageListener event is fired by the
+     * manager.
      *
      * @param percentUsageMinDelta
      * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
@@ -209,27 +240,35 @@ public abstract class Usage<T extends Us
         if (percentUsageMinDelta < 1) {
             throw new IllegalArgumentException("percentUsageMinDelta must be greater than 0");
         }
-        int percentUsage;
-        synchronized (usageMutex) {
+
+        usageLock.writeLock().lock();
+        try {
             this.percentUsageMinDelta = percentUsageMinDelta;
-            percentUsage = caclPercentUsage();
+            setPercentUsage(caclPercentUsage());
+        } finally {
+            usageLock.writeLock().unlock();
         }
-        setPercentUsage(percentUsage);
     }
 
     public long getUsage() {
-        synchronized (usageMutex) {
+        usageLock.readLock().lock();
+        try {
             return retrieveUsage();
+        } finally {
+            usageLock.readLock().unlock();
         }
     }
 
     protected void setPercentUsage(int value) {
-        synchronized (usageMutex) {
+        usageLock.writeLock().lock();
+        try {
             int oldValue = percentUsage;
             percentUsage = value;
             if (oldValue != value) {
                 fireEvent(oldValue, value);
             }
+        } finally {
+            usageLock.writeLock().unlock();
         }
     }
 
@@ -237,26 +276,23 @@ public abstract class Usage<T extends Us
         if (limiter.getLimit() == 0) {
             return 0;
         }
-        return (int)((((retrieveUsage() * 100) / limiter.getLimit()) / percentUsageMinDelta) * percentUsageMinDelta);
+        return (int) ((((retrieveUsage() * 100) / limiter.getLimit()) / percentUsageMinDelta) * percentUsageMinDelta);
     }
 
+    // Must be called with the usage lock's writeLock held.
     private void fireEvent(final int oldPercentUsage, final int newPercentUsage) {
         if (debug) {
-            LOG.debug(getName() + ": usage change from: " + oldPercentUsage + "% of available memory, to: "
-                + newPercentUsage + "% of available memory");
+            LOG.debug(getName() + ": usage change from: " + oldPercentUsage + "% of available memory, to: " + newPercentUsage + "% of available memory");
         }
         if (started.get()) {
             // Switching from being full to not being full..
             if (oldPercentUsage >= 100 && newPercentUsage < 100) {
-                synchronized (usageMutex) {
-                    usageMutex.notifyAll();
-                    if (!callbacks.isEmpty()) {
-                        for (Iterator<Runnable> iter = new ArrayList<Runnable>(callbacks).iterator(); iter.hasNext();) {
-                            Runnable callback = iter.next();
-                            getExecutor().execute(callback);
-                        }
-                        callbacks.clear();
+                waitForSpaceCondition.signalAll();
+                if (!callbacks.isEmpty()) {
+                    for (Runnable callback : callbacks) {
+                        getExecutor().execute(callback);
                     }
+                    callbacks.clear();
                 }
             }
             if (!listeners.isEmpty()) {
@@ -264,9 +300,8 @@ public abstract class Usage<T extends Us
                 Runnable listenerNotifier = new Runnable() {
                     @Override
                     public void run() {
-                        for (Iterator<UsageListener> iter = listeners.iterator(); iter.hasNext();) {
-                            UsageListener l = iter.next();
-                            l.onUsageChanged(Usage.this, oldPercentUsage, newPercentUsage);
+                        for (UsageListener listener : listeners) {
+                            listener.onUsageChanged(Usage.this, oldPercentUsage, newPercentUsage);
                         }
                     }
                 };
@@ -285,24 +320,21 @@ public abstract class Usage<T extends Us
 
     @Override
     public String toString() {
-        return "Usage(" + getName() + ") percentUsage=" + percentUsage
-                + "%, usage=" + retrieveUsage() + ", limit=" + limiter.getLimit()
-                + ", percentUsageMinDelta=" + percentUsageMinDelta + "%"
-                + (parent != null ? ";Parent:" + parent.toString() : "");
+        return "Usage(" + getName() + ") percentUsage=" + percentUsage + "%, usage=" + retrieveUsage() + ", limit=" + limiter.getLimit()
+            + ", percentUsageMinDelta=" + percentUsageMinDelta + "%" + (parent != null ? ";Parent:" + parent.toString() : "");
     }
 
     @Override
     @SuppressWarnings("unchecked")
     public void start() {
-        if (started.compareAndSet(false, true)){
+        if (started.compareAndSet(false, true)) {
             if (parent != null) {
                 parent.addChild(this);
-                if(getLimit() > parent.getLimit()) {
-                    LOG.info("Usage({}) limit={} should be smaller than its parent limit={}",
-                             new Object[]{getName(), getLimit(), parent.getLimit()});
+                if (getLimit() > parent.getLimit()) {
+                    LOG.info("Usage({}) limit={} should be smaller than its parent limit={}", new Object[] { getName(), getLimit(), parent.getLimit() });
                 }
             }
-            for (T t:children) {
+            for (T t : children) {
                 t.start();
             }
         }
@@ -311,21 +343,24 @@ public abstract class Usage<T extends Us
     @Override
     @SuppressWarnings("unchecked")
     public void stop() {
-        if (started.compareAndSet(true, false)){
+        if (started.compareAndSet(true, false)) {
             if (parent != null) {
                 parent.removeChild(this);
             }
 
-            //clear down any callbacks
-            synchronized (usageMutex) {
-                usageMutex.notifyAll();
-                for (Iterator<Runnable> iter = new ArrayList<Runnable>(this.callbacks).iterator(); iter.hasNext();) {
-                    Runnable callback = iter.next();
+            // clear down any callbacks
+            usageLock.writeLock().lock();
+            try {
+                waitForSpaceCondition.signalAll();
+                for (Runnable callback : this.callbacks) {
                     callback.run();
                 }
                 this.callbacks.clear();
+            } finally {
+                usageLock.writeLock().unlock();
             }
-            for (T t:children) {
+
+            for (T t : children) {
                 t.stop();
             }
         }
@@ -344,8 +379,7 @@ public abstract class Usage<T extends Us
 
     /**
      * @param callback
-     * @return true if the UsageManager was full. The callback will only be
-     *         called if this method returns true.
+     * @return true if the UsageManager was full. The callback will only be called if this method returns true.
      */
     public boolean notifyCallbackWhenNotFull(final Runnable callback) {
         if (parent != null) {
@@ -353,12 +387,15 @@ public abstract class Usage<T extends Us
 
                 @Override
                 public void run() {
-                    synchronized (usageMutex) {
+                    usageLock.writeLock().lock();
+                    try {
                         if (percentUsage >= 100) {
                             callbacks.add(callback);
                         } else {
                             callback.run();
                         }
+                    } finally {
+                        usageLock.writeLock().unlock();
                     }
                 }
             };
@@ -366,13 +403,16 @@ public abstract class Usage<T extends Us
                 return true;
             }
         }
-        synchronized (usageMutex) {
+        usageLock.writeLock().lock();
+        try {
             if (percentUsage >= 100) {
                 callbacks.add(callback);
                 return true;
             } else {
                 return false;
             }
+        } finally {
+            usageLock.writeLock().unlock();
         }
     }
 
@@ -384,7 +424,8 @@ public abstract class Usage<T extends Us
     }
 
     /**
-     * @param limiter the limiter to set
+     * @param limiter
+     *            the limiter to set
      */
     public void setLimiter(UsageCapacity limiter) {
         this.limiter = limiter;
@@ -398,7 +439,8 @@ public abstract class Usage<T extends Us
     }
 
     /**
-     * @param pollingTime the pollingTime to set
+     * @param pollingTime
+     *            the pollingTime to set
      */
     public void setPollingTime(int pollingTime) {
         this.pollingTime = pollingTime;
@@ -416,7 +458,7 @@ public abstract class Usage<T extends Us
         this.parent = parent;
     }
 
-    public void setExecutor (ThreadPoolExecutor executor) {
+    public void setExecutor(ThreadPoolExecutor executor) {
         this.executor = executor;
     }
 

Added: activemq/trunk/activemq-client/src/test/java/org/apache/activemq/usage/MemoryUsageConcurrencyTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/test/java/org/apache/activemq/usage/MemoryUsageConcurrencyTest.java?rev=1481106&view=auto
==============================================================================
--- activemq/trunk/activemq-client/src/test/java/org/apache/activemq/usage/MemoryUsageConcurrencyTest.java (added)
+++ activemq/trunk/activemq-client/src/test/java/org/apache/activemq/usage/MemoryUsageConcurrencyTest.java Fri May 10 17:13:48 2013
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usage;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.Test;
+
+public class MemoryUsageConcurrencyTest {
+
+    @Test
+    public void testCycle() throws Exception {
+        Random r = new Random(0xb4a14);
+        for (int i = 0; i < 50000; i++) {
+            checkPercentage(i, i, r.nextInt(100) + 10, i % 2 == 0, i % 5 == 0);
+        }
+    }
+
+    private void checkPercentage(int attempt, int seed, int operations, boolean useArrayBlocking, boolean useWaitForSpaceThread) throws InterruptedException {
+
+        final BlockingQueue<Integer> toAdd;
+        final BlockingQueue<Integer> toRemove;
+        final BlockingQueue<Integer> removed;
+
+        if (useArrayBlocking) {
+            toAdd = new ArrayBlockingQueue<Integer>(operations);
+            toRemove = new ArrayBlockingQueue<Integer>(operations);
+            removed = new ArrayBlockingQueue<Integer>(operations);
+        } else {
+            toAdd = new LinkedBlockingQueue<Integer>();
+            toRemove = new LinkedBlockingQueue<Integer>();
+            removed = new LinkedBlockingQueue<Integer>();
+        }
+
+        final AtomicBoolean running = new AtomicBoolean(true);
+        final CountDownLatch startLatch = new CountDownLatch(1);
+
+        final MemoryUsage memUsage = new MemoryUsage();
+        memUsage.setLimit(1000);
+        memUsage.start();
+
+        Thread addThread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    startLatch.await();
+
+                    while (true) {
+                        Integer add = toAdd.poll(1, TimeUnit.MILLISECONDS);
+                        if (add == null) {
+                            if (!running.get()) {
+                                break;
+                            }
+                        } else {
+                            // add to other queue before removing
+                            toRemove.add(add);
+                            memUsage.increaseUsage(add);
+                        }
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+
+        Thread removeThread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    startLatch.await();
+
+                    while (true) {
+                        Integer remove = toRemove.poll(1, TimeUnit.MILLISECONDS);
+                        if (remove == null) {
+                            if (!running.get()) {
+                                break;
+                            }
+                        } else {
+                            memUsage.decreaseUsage(remove);
+                            removed.add(remove);
+                        }
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+
+        Thread waitForSpaceThread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    startLatch.await();
+
+                    while (running.get()) {
+                        memUsage.waitForSpace();
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+
+        removeThread.start();
+        addThread.start();
+        if (useWaitForSpaceThread) {
+            waitForSpaceThread.start();
+        }
+
+        Random r = new Random(seed);
+
+        startLatch.countDown();
+
+        for (int i = 0; i < operations; i++) {
+            toAdd.add(r.nextInt(100) + 1);
+        }
+
+        // we expect the failure percentage to be related to the last operation
+        List<Integer> ops = new ArrayList<Integer>(operations);
+        for (int i = 0; i < operations; i++) {
+            Integer op = removed.poll(1000, TimeUnit.MILLISECONDS);
+            assertNotNull(op);
+            ops.add(op);
+        }
+
+        running.set(false);
+
+        if (useWaitForSpaceThread) {
+            try {
+                waitForSpaceThread.join(1000);
+            } catch (InterruptedException e) {
+                System.out.println("Attempt: " + attempt + " : " + memUsage + " waitForSpace never returned");
+                waitForSpaceThread.interrupt();
+                waitForSpaceThread.join();
+            }
+        }
+
+        removeThread.join();
+        addThread.join();
+
+        if (memUsage.getPercentUsage() != 0 || memUsage.getUsage() != memUsage.getPercentUsage()) {
+            System.out.println("Attempt: " + attempt + " : " + memUsage);
+            System.out.println("Operations: " + ops);
+            assertEquals(0, memUsage.getPercentUsage());
+        }
+    }
+}

Propchange: activemq/trunk/activemq-client/src/test/java/org/apache/activemq/usage/MemoryUsageConcurrencyTest.java
------------------------------------------------------------------------------
    svn:eol-style = native