You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/03/12 15:01:02 UTC

svn commit: r517222 - in /activemq/trunk/activemq-core: ./ src/main/java/org/apache/activemq/broker/region/ src/main/java/org/apache/activemq/memory/ src/test/java/org/apache/activemq/

Author: chirino
Date: Mon Mar 12 07:01:01 2007
New Revision: 517222

URL: http://svn.apache.org/viewvc?view=rev&rev=517222
Log:
Porting over tests in the 4.1 branch and updated the UsageManager so that if a Limit is set, then 
it is used instead of being % based off the parent.


Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java
      - copied, changed from r511078, activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
      - copied, changed from r511078, activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
Modified:
    activemq/trunk/activemq-core/pom.xml
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java

Modified: activemq/trunk/activemq-core/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/pom.xml?view=diff&rev=517222&r1=517221&r2=517222
==============================================================================
--- activemq/trunk/activemq-core/pom.xml (original)
+++ activemq/trunk/activemq-core/pom.xml Mon Mar 12 07:01:01 2007
@@ -265,6 +265,9 @@
 
             <!-- The NIO implemenation is not working properly on OS X.. -->
             <exclude>**/nio/**</exclude>
+
+            <exclude>**/AMQDeadlockTest3.*</exclude>
+            <exclude>**/ProducerFlowControlTest.*</exclude>
           </excludes>
         </configuration>
       </plugin>

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=517222&r1=517221&r2=517222
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Mon Mar 12 07:01:01 2007
@@ -102,7 +102,7 @@
             TaskRunnerFactory taskFactory, Store tmpStore) throws Exception {
         this.destination = destination;
         this.usageManager = new UsageManager(memoryManager,destination.toString());
-        this.usageManager.setLimit(Long.MAX_VALUE);
+        this.usageManager.setUsagePortion(1.0f);
         this.store = store;
         if(destination.isTemporary()){
             this.messages=new VMPendingMessageCursor();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?view=diff&rev=517222&r1=517221&r2=517222
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Mon Mar 12 07:01:01 2007
@@ -76,7 +76,7 @@
         this.destination = destination;
         this.store = store; //this could be NULL! (If an advsiory)
         this.usageManager = new UsageManager(memoryManager,destination.toString());
-        this.usageManager.setLimit(Long.MAX_VALUE);
+        this.usageManager.setUsagePortion(1.0f);
         
         // Let the store know what usage manager we are using so that he can flush messages to disk
         // when usage gets high.

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java?view=diff&rev=517222&r1=517221&r2=517222
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java Mon Mar 12 07:01:01 2007
@@ -194,7 +194,8 @@
     }
 
     /**
-     * Sets the memory limit in bytes.
+     * Sets the memory limit in bytes.  Setting the limit in bytes will set the usagePortion to 0 since 
+     * the UsageManager is not going to be portion based off the parent.
      * 
      * When set using XBean, you can use values such as: "20 mb", "1024 kb", or "1 gb"
      * 
@@ -204,17 +205,49 @@
         if(percentUsageMinDelta < 0 ) {
             throw new IllegalArgumentException("percentUsageMinDelta must be greater or equal to 0");
         }
+        synchronized(usageMutex){
+            this.limit=limit;
+            this.usagePortion=0;
+        }
+        onLimitChange();
+    }
+    
+	private void onLimitChange() {
+		
+		// We may need to calculate the limit
+		if( usagePortion > 0 && parent!=null ) {
+	        synchronized(usageMutex){
+	        	limit = (long)(parent.getLimit()*usagePortion);
+	        }
+		}
+		
+		// Reset the percent currently being used.
         int percentUsage;
         synchronized(usageMutex){
-            this.limit=parent!=null?(long)(parent.limit*usagePortion):limit;
             percentUsage=caclPercentUsage();
         }
         setPercentUsage(percentUsage);
+        
+        // Let the children know that the limit has changed.  They may need to set
+        // their limits based on ours.
         for (UsageManager child:children) {
-            child.setLimit(limit);
+            child.onLimitChange();
         }
-    }
-    
+	}
+
+	public float getUsagePortion() {
+        synchronized(usageMutex){
+        	return usagePortion;
+        }
+	}
+
+	public void setUsagePortion(float usagePortion) {
+        synchronized(usageMutex){
+        	this.usagePortion = usagePortion;
+        }
+        onLimitChange();
+	}
+
     /*
     * Sets the minimum number of percentage points the usage has to change before a UsageListener
     * event is fired by the manager.
@@ -369,5 +402,6 @@
             }
         }
     }
+
 
 }

Copied: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java (from r511078, activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java?view=diff&rev=517222&p1=activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java&r1=511078&p2=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java&r2=517222
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java Mon Mar 12 07:01:01 2007
@@ -3,6 +3,11 @@
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.BytesMessage;
 import javax.jms.ConnectionFactory;
@@ -15,7 +20,6 @@
 import junit.framework.Assert;
 import junit.framework.TestCase;
 
-import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
@@ -28,11 +32,6 @@
 import org.springframework.jms.core.MessageCreator;
 import org.springframework.jms.listener.DefaultMessageListenerContainer;
 
-import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
-import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService;
-import edu.emory.mathcs.backport.java.util.concurrent.Executors;
-import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
-import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
 
 public class AMQDeadlockTest3 extends TestCase {
 

Copied: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java (from r511078, activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java?view=diff&rev=517222&p1=activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java&r1=511078&p2=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java&r2=517222
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java Mon Mar 12 07:01:01 2007
@@ -1,6 +1,9 @@
 package org.apache.activemq;
 
 import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jms.ConnectionFactory;
 import javax.jms.DeliveryMode;
@@ -14,12 +17,10 @@
 import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.transport.tcp.TcpTransport;
 
-import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
-import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
-import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
 
 public class ProducerFlowControlTest extends JmsTestSupport {
 	
@@ -140,7 +141,8 @@
         // Setup a destination policy where it takes only 1 message at a time.
         PolicyMap policyMap = new PolicyMap();        
         PolicyEntry policy = new PolicyEntry();
-        policy.setMemoryLimit(1);        
+        policy.setMemoryLimit(1);
+        policy.setPendingSubscriberPolicy(new VMPendingSubscriberMessageStoragePolicy());
         policyMap.setDefaultEntry(policy);        
         service.setDestinationPolicy(policyMap);