You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/03/12 15:01:02 UTC
svn commit: r517222 - in /activemq/trunk/activemq-core: ./
src/main/java/org/apache/activemq/broker/region/
src/main/java/org/apache/activemq/memory/ src/test/java/org/apache/activemq/
Author: chirino
Date: Mon Mar 12 07:01:01 2007
New Revision: 517222
URL: http://svn.apache.org/viewvc?view=rev&rev=517222
Log:
Porting over tests in the 4.1 branch and updated the UsageManager so that if a Limit is set, then
it is used instead of being % based off the parent.
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java
- copied, changed from r511078, activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
- copied, changed from r511078, activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
Modified:
activemq/trunk/activemq-core/pom.xml
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java
Modified: activemq/trunk/activemq-core/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/pom.xml?view=diff&rev=517222&r1=517221&r2=517222
==============================================================================
--- activemq/trunk/activemq-core/pom.xml (original)
+++ activemq/trunk/activemq-core/pom.xml Mon Mar 12 07:01:01 2007
@@ -265,6 +265,9 @@
<!-- The NIO implemenation is not working properly on OS X.. -->
<exclude>**/nio/**</exclude>
+
+ <exclude>**/AMQDeadlockTest3.*</exclude>
+ <exclude>**/ProducerFlowControlTest.*</exclude>
</excludes>
</configuration>
</plugin>
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=517222&r1=517221&r2=517222
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Mon Mar 12 07:01:01 2007
@@ -102,7 +102,7 @@
TaskRunnerFactory taskFactory, Store tmpStore) throws Exception {
this.destination = destination;
this.usageManager = new UsageManager(memoryManager,destination.toString());
- this.usageManager.setLimit(Long.MAX_VALUE);
+ this.usageManager.setUsagePortion(1.0f);
this.store = store;
if(destination.isTemporary()){
this.messages=new VMPendingMessageCursor();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?view=diff&rev=517222&r1=517221&r2=517222
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Mon Mar 12 07:01:01 2007
@@ -76,7 +76,7 @@
this.destination = destination;
this.store = store; //this could be NULL! (If an advsiory)
this.usageManager = new UsageManager(memoryManager,destination.toString());
- this.usageManager.setLimit(Long.MAX_VALUE);
+ this.usageManager.setUsagePortion(1.0f);
// Let the store know what usage manager we are using so that he can flush messages to disk
// when usage gets high.
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java?view=diff&rev=517222&r1=517221&r2=517222
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java Mon Mar 12 07:01:01 2007
@@ -194,7 +194,8 @@
}
/**
- * Sets the memory limit in bytes.
+ * Sets the memory limit in bytes. Setting the limit in bytes will set the usagePortion to 0 since
+ * the UsageManager is not going to be portion based off the parent.
*
* When set using XBean, you can use values such as: "20 mb", "1024 kb", or "1 gb"
*
@@ -204,17 +205,49 @@
if(percentUsageMinDelta < 0 ) {
throw new IllegalArgumentException("percentUsageMinDelta must be greater or equal to 0");
}
+ synchronized(usageMutex){
+ this.limit=limit;
+ this.usagePortion=0;
+ }
+ onLimitChange();
+ }
+
+ private void onLimitChange() {
+
+ // We may need to calculate the limit
+ if( usagePortion > 0 && parent!=null ) {
+ synchronized(usageMutex){
+ limit = (long)(parent.getLimit()*usagePortion);
+ }
+ }
+
+ // Reset the percent currently being used.
int percentUsage;
synchronized(usageMutex){
- this.limit=parent!=null?(long)(parent.limit*usagePortion):limit;
percentUsage=caclPercentUsage();
}
setPercentUsage(percentUsage);
+
+ // Let the children know that the limit has changed. They may need to set
+ // their limits based on ours.
for (UsageManager child:children) {
- child.setLimit(limit);
+ child.onLimitChange();
}
- }
-
+ }
+
+ public float getUsagePortion() {
+ synchronized(usageMutex){
+ return usagePortion;
+ }
+ }
+
+ public void setUsagePortion(float usagePortion) {
+ synchronized(usageMutex){
+ this.usagePortion = usagePortion;
+ }
+ onLimitChange();
+ }
+
/*
* Sets the minimum number of percentage points the usage has to change before a UsageListener
* event is fired by the manager.
@@ -369,5 +402,6 @@
}
}
}
+
}
Copied: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java (from r511078, activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java?view=diff&rev=517222&p1=activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java&r1=511078&p2=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java&r2=517222
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java Mon Mar 12 07:01:01 2007
@@ -3,6 +3,11 @@
import java.net.URI;
import java.util.ArrayList;
import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.BytesMessage;
import javax.jms.ConnectionFactory;
@@ -15,7 +20,6 @@
import junit.framework.Assert;
import junit.framework.TestCase;
-import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
@@ -28,11 +32,6 @@
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
-import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
-import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService;
-import edu.emory.mathcs.backport.java.util.concurrent.Executors;
-import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
-import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
public class AMQDeadlockTest3 extends TestCase {
Copied: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java (from r511078, activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java?view=diff&rev=517222&p1=activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java&r1=511078&p2=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java&r2=517222
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java Mon Mar 12 07:01:01 2007
@@ -1,6 +1,9 @@
package org.apache.activemq;
import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
@@ -14,12 +17,10 @@
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.transport.tcp.TcpTransport;
-import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
-import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
-import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
public class ProducerFlowControlTest extends JmsTestSupport {
@@ -140,7 +141,8 @@
// Setup a destination policy where it takes only 1 message at a time.
PolicyMap policyMap = new PolicyMap();
PolicyEntry policy = new PolicyEntry();
- policy.setMemoryLimit(1);
+ policy.setMemoryLimit(1);
+ policy.setPendingSubscriberPolicy(new VMPendingSubscriberMessageStoragePolicy());
policyMap.setDefaultEntry(policy);
service.setDestinationPolicy(policyMap);