You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/09/04 06:56:26 UTC

svn commit: r439930 - in /incubator/activemq/trunk: activemq-core/ activemq-core/src/main/java/org/apache/activemq/broker/region/policy/ activemq-core/src/main/java/org/apache/activemq/memory/ activemq-core/src/main/java/org/apache/activemq/network/jms...

Author: chirino
Date: Sun Sep  3 21:56:25 2006
New Revision: 439930

URL: http://svn.apache.org/viewvc?view=rev&rev=439930
Log:
Added a MemoryPropertyEditor that allows you to specify memory sizes in the xbean config like: limit="20 MB"
Upgraded the xbean maven plugin to 2.6 and the new qdox that it used did not like some of our valid inline initialization of variables, so I had
to refactor to an equivalent form that qdox did like.

http://issues.apache.org/activemq/browse/AMQ-827
http://issues.apache.org/activemq/browse/AMQ-909


Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/MemoryPropertyEditor.java
Modified:
    incubator/activemq/trunk/activemq-core/pom.xml
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/DefaultPersistenceAdapterFactory.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalPersistenceAdapter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
    incubator/activemq/trunk/assembly/src/release/conf/activemq-nojournal.xml
    incubator/activemq/trunk/assembly/src/release/conf/activemq.xml

Modified: incubator/activemq/trunk/activemq-core/pom.xml
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/pom.xml?view=diff&rev=439930&r1=439929&r2=439930
==============================================================================
--- incubator/activemq/trunk/activemq-core/pom.xml (original)
+++ incubator/activemq/trunk/activemq-core/pom.xml Sun Sep  3 21:56:25 2006
@@ -359,7 +359,7 @@
       <plugin>
         <groupId>org.apache.xbean</groupId>
         <artifactId>maven-xbean-plugin</artifactId>
-       <version>2.5</version>
+        <version>${xbean-version}</version>
         <executions>
           <execution>
             <configuration>

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?view=diff&rev=439930&r1=439929&r2=439930
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Sun Sep  3 21:56:25 2006
@@ -172,6 +172,10 @@
         return memoryLimit;
     }
 
+    /**
+     * 
+     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
+     */
     public void setMemoryLimit(long memoryLimit) {
         this.memoryLimit = memoryLimit;
     }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java?view=diff&rev=439930&r1=439929&r2=439930
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java Sun Sep  3 21:56:25 2006
@@ -151,7 +151,11 @@
     }
 
     /**
-     * Sets the memory limit in bytes
+     * Sets the memory limit in bytes.
+     * 
+     * When set using XBean, you can use values such as: "20 mb", "1024 kb", or "1 gb"
+     * 
+     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
      */
     public void setLimit(long limit) {
         if(percentUsageMinDelta < 0 ) {
@@ -165,20 +169,6 @@
         setPercentUsage(percentUsage);
     }
     
-    /**
-     * Sets the memory limit in megabytes
-     */
-    public void setLimitMb(long limitMb) {
-        setLimitKb(1024 * limitMb);
-    }
-    
-    /**
-     * Sets the memory limit in kilobytes
-     */
-    public void setLimitKb(long limitKb) {
-        setLimit(1024 * limitKb);
-    }
-
     /*
     * Sets the minimum number of percentage points the usage has to change before a UsageListener
     * event is fired by the manager.

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java?view=diff&rev=439930&r1=439929&r2=439930
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java Sun Sep  3 21:56:25 2006
@@ -63,30 +63,33 @@
     protected String localPassword;
     private String name;
 
-    protected LRUCache replyToBridges = new LRUCache() {
-        /**
-         * 
-         */
-        private static final long serialVersionUID = -7446792754185879286L;
-
-        protected boolean removeEldestEntry(Map.Entry enty) {
-            if (size() > maxCacheSize) {
-                Iterator iter = entrySet().iterator();
-                Map.Entry lru = (Map.Entry) iter.next();
-                remove(lru.getKey());
-                DestinationBridge bridge = (DestinationBridge) lru.getValue();
-                try {
-                    bridge.stop();
-                    log.info("Expired bridge: " + bridge);
-                }
-                catch (Exception e) {
-                    log.warn("stopping expired bridge" + bridge + " caused an exception", e);
-                }
-            }
-            return false;
-        }
-    };
+    protected LRUCache replyToBridges = createLRUCache(); 
+    	
+    static private LRUCache createLRUCache() { 
+    	return new LRUCache() {
+	        private static final long serialVersionUID = -7446792754185879286L;
+	
+	        protected boolean removeEldestEntry(Map.Entry enty) {
+	            if (size() > maxCacheSize) {
+	                Iterator iter = entrySet().iterator();
+	                Map.Entry lru = (Map.Entry) iter.next();
+	                remove(lru.getKey());
+	                DestinationBridge bridge = (DestinationBridge) lru.getValue();
+	                try {
+	                    bridge.stop();
+	                    log.info("Expired bridge: " + bridge);
+	                }
+	                catch (Exception e) {
+	                    log.warn("stopping expired bridge" + bridge + " caused an exception", e);
+	                }
+	            }
+	            return false;
+	        }
+	    };
+    }
 
+    /**
+     */
     public boolean init() {
         boolean result = initialized.compareAndSet(false, true);
         if (result) {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/DefaultPersistenceAdapterFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/DefaultPersistenceAdapterFactory.java?view=diff&rev=439930&r1=439929&r2=439930
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/DefaultPersistenceAdapterFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/DefaultPersistenceAdapterFactory.java Sun Sep  3 21:56:25 2006
@@ -82,6 +82,9 @@
         return journalLogFileSize;
     }
 
+    /**
+     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
+     */
     public void setJournalLogFileSize(int journalLogFileSize) {
         this.journalLogFileSize = journalLogFileSize;
     }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java?view=diff&rev=439930&r1=439929&r2=439930
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java Sun Sep  3 21:56:25 2006
@@ -107,13 +107,17 @@
     
     private AtomicBoolean started = new AtomicBoolean(false);
 
-    private final Runnable periodicCheckpointTask = new Runnable() {
-        public void run() {
-            if( System.currentTimeMillis()>lastCheckpointRequest+checkpointInterval ) {
-                checkpoint(false, true);
-            }
-        }
-    };
+    private final Runnable periodicCheckpointTask = createPeriodicCheckpointTask(); 
+    	
+    final Runnable createPeriodicCheckpointTask() {
+    	return new Runnable() {
+	        public void run() {
+	            if( System.currentTimeMillis()>lastCheckpointRequest+checkpointInterval ) {
+	                checkpoint(false, true);
+	            }
+	        }
+	    };
+    }
     
     public JournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, TaskRunnerFactory taskRunnerFactory) throws IOException {
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalPersistenceAdapter.java?view=diff&rev=439930&r1=439929&r2=439930
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalPersistenceAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalPersistenceAdapter.java Sun Sep  3 21:56:25 2006
@@ -106,13 +106,17 @@
     
     private AtomicBoolean started = new AtomicBoolean(false);
 
-    private final Runnable periodicCheckpointTask = new Runnable() {
-        public void run() {
-            if( System.currentTimeMillis()>lastCheckpointRequest+checkpointInterval ) {
-                checkpoint(false, true);
-            }
-        }
-    };
+    private final Runnable periodicCheckpointTask  = createPeriodicCheckpointTask(); 
+    	
+    final Runnable createPeriodicCheckpointTask() {
+    	return new Runnable() {
+	        public void run() {
+	            if( System.currentTimeMillis()>lastCheckpointRequest+checkpointInterval ) {
+	                checkpoint(false, true);
+	            }
+	        }
+	    };
+    }
     
     public QuickJournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, TaskRunnerFactory taskRunnerFactory) throws IOException {
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java?view=diff&rev=439930&r1=439929&r2=439930
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java Sun Sep  3 21:56:25 2006
@@ -188,6 +188,8 @@
 
     /**
      * @param maxDataFileLength the maxDataFileLength to set
+     * 
+     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
      */
     public void setMaxDataFileLength(long maxDataFileLength){
         this.maxDataFileLength=maxDataFileLength;

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java?view=diff&rev=439930&r1=439929&r2=439930
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java Sun Sep  3 21:56:25 2006
@@ -114,13 +114,17 @@
     private boolean useExternalMessageReferences;
 
 
-    private final Runnable periodicCheckpointTask = new Runnable() {
-        public void run() {
-            if( System.currentTimeMillis()>lastCheckpointRequest+checkpointInterval ) {
-                checkpoint(false, true);
-            }
-        }
-    };
+    private final Runnable periodicCheckpointTask  = createPeriodicCheckpointTask(); 
+    	
+    final Runnable createPeriodicCheckpointTask() {
+    	return new Runnable() {
+    		public void run() {
+	            if( System.currentTimeMillis()>lastCheckpointRequest+checkpointInterval ) {
+	                checkpoint(false, true);
+	            }
+	        }
+	    };
+    }
     
     public RapidPersistenceAdapter(Journal journal, TaskRunnerFactory taskRunnerFactory) throws IOException {
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?view=diff&rev=439930&r1=439929&r2=439930
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Sun Sep  3 21:56:25 2006
@@ -80,55 +80,59 @@
     private long reconnectDelay = initialReconnectDelay;
     private Exception connectionFailure;
 
-    private final TransportListener myTransportListener = new TransportListener() {
-        public void onCommand(Command command) {
-            if (command == null) {
-                return;
-            }
-            if (command.isResponse()) {
-                requestMap.remove(new Integer(((Response) command).getCorrelationId()));
-            }
-            if (!initialized){
-                if (command.isBrokerInfo()){
-                    BrokerInfo info = (BrokerInfo)command;
-                    BrokerInfo[] peers = info.getPeerBrokerInfos();
-                    if (peers!= null){
-                        for (int i =0; i < peers.length;i++){
-                            String brokerString = peers[i].getBrokerURL();
-                            add(brokerString);
-                        }
-                    }
-                initialized = true;
-                }
-                
-            }
-            if (transportListener != null) {
-                transportListener.onCommand(command);
-            }
-        }
-
-        public void onException(IOException error) {
-            try {
-                handleTransportFailure(error);
-            }
-            catch (InterruptedException e) {
-                transportListener.onException(new InterruptedIOException());
-            }
-        }
-        
-        public void transportInterupted(){
-            if (transportListener != null){
-                transportListener.transportInterupted();
-            }
-        }
-
-        public void transportResumed(){
-            if(transportListener != null){
-                transportListener.transportResumed();
-            }
-        }
-    };
-
+    private final TransportListener myTransportListener = createTransportListener();
+    
+    TransportListener createTransportListener() {
+    	return new TransportListener() {
+	        public void onCommand(Command command) {
+	            if (command == null) {
+	                return;
+	            }
+	            if (command.isResponse()) {
+	                requestMap.remove(new Integer(((Response) command).getCorrelationId()));
+	            }
+	            if (!initialized){
+	                if (command.isBrokerInfo()){
+	                    BrokerInfo info = (BrokerInfo)command;
+	                    BrokerInfo[] peers = info.getPeerBrokerInfos();
+	                    if (peers!= null){
+	                        for (int i =0; i < peers.length;i++){
+	                            String brokerString = peers[i].getBrokerURL();
+	                            add(brokerString);
+	                        }
+	                    }
+	                initialized = true;
+	                }
+	                
+	            }
+	            if (transportListener != null) {
+	                transportListener.onCommand(command);
+	            }
+	        }
+	
+	        public void onException(IOException error) {
+	            try {
+	                handleTransportFailure(error);
+	            }
+	            catch (InterruptedException e) {
+	                transportListener.onException(new InterruptedIOException());
+	            }
+	        }
+	        
+	        public void transportInterupted(){
+	            if (transportListener != null){
+	                transportListener.transportInterupted();
+	            }
+	        }
+	
+	        public void transportResumed(){
+	            if(transportListener != null){
+	                transportListener.transportResumed();
+	            }
+	        }
+	    };
+    }
+    
     public FailoverTransport() throws InterruptedIOException {
 
         // Setup a task that is used to reconnect the a connection async.

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/MemoryPropertyEditor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/MemoryPropertyEditor.java?view=auto&rev=439930
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/MemoryPropertyEditor.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/MemoryPropertyEditor.java Sun Sep  3 21:56:25 2006
@@ -0,0 +1,47 @@
+package org.apache.activemq.util;
+
+import java.beans.PropertyEditorSupport;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class MemoryPropertyEditor extends PropertyEditorSupport {
+	public void setAsText(String text) throws IllegalArgumentException {
+
+		Pattern p = Pattern.compile("^\\s*(\\d+)\\s*(b)?\\s*$",Pattern.CASE_INSENSITIVE);
+		Matcher m = p.matcher(text);
+		if (m.matches()) {
+			setValue(new Long(Long.parseLong(m.group(1))));
+			return;
+		}
+
+		p = Pattern.compile("^\\s*(\\d+)\\s*k(b)?\\s*$",Pattern.CASE_INSENSITIVE);
+		m = p.matcher(text);
+		if (m.matches()) {
+			setValue(new Long(Long.parseLong(m.group(1)) * 1024));
+			return;
+		}
+
+		p = Pattern.compile("^\\s*(\\d+)\\s*m(b)?\\s*$", Pattern.CASE_INSENSITIVE);
+		m = p.matcher(text);
+		if (m.matches()) {
+			setValue(new Long(Long.parseLong(m.group(1)) * 1024 * 1024 ));
+			return;
+		}
+
+		p = Pattern.compile("^\\s*(\\d+)\\s*g(b)?\\s*$", Pattern.CASE_INSENSITIVE);
+		m = p.matcher(text);
+		if (m.matches()) {
+			setValue(new Long(Long.parseLong(m.group(1)) * 1024 * 1024 * 1024 ));
+			return;
+		}
+
+		throw new IllegalArgumentException(
+				"Could convert not to a memory size: " + text);
+	}
+
+	public String getAsText() {
+		Long value = (Long) getValue();
+		return (value != null ? value.toString() : "");
+	}
+
+}

Modified: incubator/activemq/trunk/assembly/src/release/conf/activemq-nojournal.xml
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/assembly/src/release/conf/activemq-nojournal.xml?view=diff&rev=439930&r1=439929&r2=439930
==============================================================================
--- incubator/activemq/trunk/assembly/src/release/conf/activemq-nojournal.xml (original)
+++ incubator/activemq/trunk/assembly/src/release/conf/activemq-nojournal.xml Sun Sep  3 21:56:25 2006
@@ -23,7 +23,7 @@
   <broker xmlns="http://activemq.org/config/1.0">
   
     <memoryManager>  
-      <usageManager id="memory-manager" limitMb="50"/>
+      <usageManager id="memory-manager" limit="50 MB"/>
     </memoryManager>
 
     <!-- In ActiveMQ 4, you can setup destination policies -->  

Modified: incubator/activemq/trunk/assembly/src/release/conf/activemq.xml
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/assembly/src/release/conf/activemq.xml?view=diff&rev=439930&r1=439929&r2=439930
==============================================================================
--- incubator/activemq/trunk/assembly/src/release/conf/activemq.xml (original)
+++ incubator/activemq/trunk/assembly/src/release/conf/activemq.xml Sun Sep  3 21:56:25 2006
@@ -22,9 +22,9 @@
   
   <broker useJmx="true" xmlns="http://activemq.org/config/1.0">
   
-    <!--  Use the following to set the broker memory limit (in bytes)
+    <!--  Use the following to set the broker memory limit 
     <memoryManager>  
-        <usageManager id="memory-manager" limit="1048576"/>
+        <usageManager id="memory-manager" limit="20 MB"/>
     </memoryManager>
     -->