You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/11/15 21:56:22 UTC

svn commit: r475416 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq: kaha/impl/container/ListContainerImpl.java store/kahadaptor/KahaMessageStore.java store/kahadaptor/KahaPersistenceAdapter.java util/LRUCache.java

Author: rajdavies
Date: Wed Nov 15 12:56:21 2006
New Revision: 475416

URL: http://svn.apache.org/viewvc?view=rev&rev=475416
Log:
change Queue message store in Kaha store adaptor to use memory efficent list instead
of Map containers

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/LRUCache.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java?view=diff&rev=475416&r1=475415&r2=475416
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java Wed Nov 15 12:56:21 2006
@@ -47,6 +47,7 @@
     protected int offset=0;
     protected int maximumCacheSize=100;
     protected IndexItem lastCached;
+    protected boolean cacheEnabled = true;
 
     public ListContainerImpl(ContainerId id,IndexItem root,IndexManager indexManager,DataManager dataManager,
             String indexType) throws IOException{
@@ -858,46 +859,51 @@
     }
 
     protected void itemAdded(IndexItem item,int pos,Object value){
-        int cachePosition=pos-offset;
-        // if pos is before the cache offset
-        // we need to clear the cache
-        if(pos<offset){
-            clearCache();
-        }
-        if(cacheList.isEmpty()){
-            offset=pos;
-            cacheList.add(value);
-            lastCached=item;
-        }else if(cachePosition==cacheList.size()&&cachePosition<maximumCacheSize){
-            cacheList.add(value);
-            lastCached=item;
-        }else if(cachePosition>=0&&cachePosition<=cacheList.size()){
-            cacheList.add(cachePosition,value);
-            if(cacheList.size()>maximumCacheSize){
-                itemRemoved(cacheList.size()-1);
+        if(cacheEnabled){
+            int cachePosition=pos-offset;
+            // if pos is before the cache offset
+            // we need to clear the cache
+            if(pos<offset){
+                clearCache();
+            }
+            if(cacheList.isEmpty()){
+                offset=pos;
+                cacheList.add(value);
+                lastCached=item;
+            }else if(cachePosition==cacheList.size()&&cachePosition<maximumCacheSize){
+                cacheList.add(value);
+                lastCached=item;
+            }else if(cachePosition>=0&&cachePosition<=cacheList.size()){
+                cacheList.add(cachePosition,value);
+                if(cacheList.size()>maximumCacheSize){
+                    itemRemoved(cacheList.size()-1);
+                }
             }
         }
     }
 
     protected void itemRemoved(int pos){
-        int lastPosition=offset+cacheList.size()-1;
-        int cachePosition=pos-offset;
-        if(cachePosition>=0&&cachePosition<cacheList.size()){
-            if(cachePosition==lastPosition){
-                if(lastCached!=null){
-                    lastCached=indexList.getPrevEntry(lastCached);
+        if(cacheEnabled){
+            int lastPosition=offset+cacheList.size()-1;
+            int cachePosition=pos-offset;
+            if(cachePosition>=0&&cachePosition<cacheList.size()){
+                if(cachePosition==lastPosition){
+                    if(lastCached!=null){
+                        lastCached=indexList.getPrevEntry(lastCached);
+                    }
+                }
+                cacheList.remove(pos);
+                if(cacheList.isEmpty()){
+                    clearCache();
                 }
-            }
-            cacheList.remove(pos);
-            if(cacheList.isEmpty()){
-                clearCache();
             }
         }
     }
 
     protected Object getCachedItem(int pos){
-        int cachePosition=pos-offset;
         Object result=null;
+        if(cacheEnabled) {
+        int cachePosition=pos-offset;
         if(cachePosition>=0&&cachePosition<cacheList.size()){
             result=cacheList.get(cachePosition);
         }
@@ -928,6 +934,12 @@
                 }
             }
         }
+        }else {
+            IndexItem item=indexList.get(pos);
+            if(item!=null){
+                result=getValue(item);
+            }
+        }
         return result;
     }
 
@@ -980,6 +992,10 @@
      */
     public synchronized void setMaximumCacheSize(int maximumCacheSize){
         this.maximumCacheSize=maximumCacheSize;
+        cacheEnabled = maximumCacheSize >= 0;
+        if (!cacheEnabled) {
+            clearCache();
+        }
     }
 
     /**

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java?view=diff&rev=475416&r1=475415&r2=475416
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java Wed Nov 15 12:56:21 2006
@@ -19,16 +19,17 @@
 
 import java.io.IOException;
 import java.util.Iterator;
-
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
-import org.apache.activemq.kaha.MapContainer;
+import org.apache.activemq.kaha.ListContainer;
+import org.apache.activemq.kaha.StoreEntry;
 import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.util.LRUCache;
 /**
  * An implementation of {@link org.apache.activemq.store.MessageStore} which uses a JPS Container
  * 
@@ -36,50 +37,84 @@
  */
 public class KahaMessageStore implements MessageStore{
     protected final ActiveMQDestination destination;
-    protected final MapContainer messageContainer;
+    protected final ListContainer messageContainer;
+    protected final LRUCache cache;
 
-    public KahaMessageStore(MapContainer container,ActiveMQDestination destination) throws IOException{
+    public KahaMessageStore(ListContainer container,ActiveMQDestination destination, int maximumCacheSize) throws IOException{
         this.messageContainer=container;
         this.destination=destination;
+        this.cache=new LRUCache(maximumCacheSize,maximumCacheSize,0.75f,false);
+        // populate the cache
+        StoreEntry entry=messageContainer.getFirst();
+        int count = 0;
+        if(entry!=null){
+            do{
+                Message msg = (Message)messageContainer.get(entry);
+                cache.put(msg.getMessageId(),entry);
+                entry = messageContainer.getNext(entry);
+                count++;
+            }while(entry!=null && count < maximumCacheSize);
+        }
     }
     
     public Object getId(){
         return messageContainer.getId();
     }
 
-    public void addMessage(ConnectionContext context,Message message) throws IOException{
-        messageContainer.put(message.getMessageId().toString(),message);
+    public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
+        StoreEntry item = messageContainer.placeLast(message);
+        cache.put(message.getMessageId(),item);
     }
 
-    public void addMessageReference(ConnectionContext context,MessageId messageId,long expirationTime,String messageRef)
+    public synchronized void addMessageReference(ConnectionContext context,MessageId messageId,long expirationTime,String messageRef)
                     throws IOException{
-        messageContainer.put(messageId.toString(),messageRef);
+        throw new RuntimeException("Not supported");
     }
 
-    public Message getMessage(MessageId identity) throws IOException{
-        return (Message) messageContainer.get(identity.toString());
+    public synchronized Message getMessage(MessageId identity) throws IOException{
+        Message result=null;
+        StoreEntry entry=(StoreEntry)cache.remove(identity);
+        if(entry!=null){
+            result = (Message)messageContainer.get(entry);
+        }else{
+       
+        for(Iterator i=messageContainer.iterator();i.hasNext();){
+            Message msg=(Message)i.next();
+            if(msg.getMessageId().equals(identity)){
+                result=msg;
+                break;
+            }
+        }
+        }
+        return result;
     }
 
     public String getMessageReference(MessageId identity) throws IOException{
-        return (String) messageContainer.get(identity.toString());
+        return null;
     }
 
     public void removeMessage(ConnectionContext context,MessageAck ack) throws IOException{
-        messageContainer.remove(ack.getLastMessageId().toString());
+        removeMessage(ack.getLastMessageId());
     }
 
-    public void removeMessage(MessageId msgId) throws IOException{
-        messageContainer.remove(msgId.toString());
+    public synchronized void removeMessage(MessageId msgId) throws IOException{
+        StoreEntry entry=(StoreEntry)cache.remove(msgId);
+        if(entry!=null){
+            messageContainer.remove(entry);
+        }else{
+            for(Iterator i=messageContainer.iterator();i.hasNext();){
+                Message msg=(Message)i.next();
+                if(msg.getMessageId().equals(msgId)){
+                    i.remove();
+                    break;
+                }
+            }
+        }
     }
 
-    public void recover(MessageRecoveryListener listener) throws Exception{
-        for(Iterator iter=messageContainer.values().iterator();iter.hasNext();){
-            Object msg=(Object) iter.next();
-            if(msg.getClass()==String.class){
-                listener.recoverMessageReference((String) msg);
-            }else{
-                listener.recoverMessage((Message) msg);
-            }
+    public synchronized void recover(MessageRecoveryListener listener) throws Exception{
+        for(Iterator iter=messageContainer.iterator();iter.hasNext();){
+            listener.recoverMessage((Message)iter.next());
         }
         listener.finished();
     }
@@ -88,16 +123,18 @@
 
     public void stop() {}
 
-    public void removeAllMessages(ConnectionContext context) throws IOException{
+    public synchronized void removeAllMessages(ConnectionContext context) throws IOException{
         messageContainer.clear();
+        cache.clear();
     }
 
     public ActiveMQDestination getDestination(){
         return destination;
     }
 
-    public void delete(){
+    public synchronized void delete(){
         messageContainer.clear();
+        cache.clear();
     }
     
     /**

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java?view=diff&rev=475416&r1=475415&r2=475416
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java Wed Nov 15 12:56:21 2006
@@ -1,19 +1,15 @@
 /**
  * 
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
  * 
  * http://www.apache.org/licenses/LICENSE-2.0
  * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 
 package org.apache.activemq.store.kahadaptor;
@@ -59,6 +55,7 @@
     private boolean useExternalMessageReferences;
     private OpenWireFormat wireFormat=new OpenWireFormat();
     private long maxDataFileLength=32*1024*1024;
+    private int maximumDestinationCacheSize=2000;
     private String indexType=IndexTypes.DISK_INDEX;
     private File dir;
     private Store theStore;
@@ -68,6 +65,8 @@
             dir.mkdirs();
         }
         this.dir=dir;
+        wireFormat.setCacheEnabled(false);
+        wireFormat.setTightEncodingEnabled(true);
     }
 
     public Set getDestinations(){
@@ -89,7 +88,7 @@
     public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException{
         MessageStore rc=(MessageStore)queues.get(destination);
         if(rc==null){
-            rc=new KahaMessageStore(getMapContainer(destination,"queue-data"),destination);
+            rc=new KahaMessageStore(getListContainer(destination,"queue-data"),destination,maximumDestinationCacheSize);
             messageStores.put(destination,rc);
             if(transactionStore!=null){
                 rc=transactionStore.proxy(rc);
@@ -185,10 +184,11 @@
         container.load();
         return container;
     }
-    
+
     protected ListContainer getListContainer(Object id,String containerName) throws IOException{
         Store store=getStore();
         ListContainer container=store.getListContainer(id,containerName);
+        container.setMaximumCacheSize(0);
         if(useExternalMessageReferences){
             container.setMarshaller(new StringMarshaller());
         }else{
@@ -199,9 +199,7 @@
     }
 
     /**
-     * @param usageManager
-     *            The UsageManager that is controlling the broker's memory
-     *            usage.
+     * @param usageManager The UsageManager that is controlling the broker's memory usage.
      */
     public void setUsageManager(UsageManager usageManager){
     }
@@ -214,8 +212,7 @@
     }
 
     /**
-     * @param maxDataFileLength
-     *            the maxDataFileLength to set
+     * @param maxDataFileLength the maxDataFileLength to set
      * 
      * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
      */
@@ -235,6 +232,20 @@
      */
     public void setIndexType(String indexType){
         this.indexType=indexType;
+    }
+
+    /**
+     * @return the maximumDestinationCacheSize
+     */
+    public int getMaximumDestinationCacheSize(){
+        return this.maximumDestinationCacheSize;
+    }
+
+    /**
+     * @param maximumDestinationCacheSize the maximumDestinationCacheSize to set
+     */
+    public void setMaximumDestinationCacheSize(int maximumDestinationCacheSize){
+        this.maximumDestinationCacheSize=maximumDestinationCacheSize;
     }
 
     protected synchronized Store getStore() throws IOException{

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/LRUCache.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/LRUCache.java?view=diff&rev=475416&r1=475415&r2=475416
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/LRUCache.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/LRUCache.java Wed Nov 15 12:56:21 2006
@@ -37,6 +37,24 @@
     public LRUCache(){
         super(1000,0.75f,true);
     }
+    
+    /**
+     * Constructs an empty <tt>LRUCache</tt> instance with the
+     * specified initial capacity, maximumCacheSize,load factor and ordering mode.
+     *
+     * @param  initialCapacity the initial capacity.
+     * @param maximumCacheSize 
+     * @param  loadFactor      the load factor.
+     * @param  accessOrder     the ordering mode - <tt>true</tt> for
+     *         access-order, <tt>false</tt> for insertion-order.
+     * @throws IllegalArgumentException if the initial capacity is negative
+     *         or the load factor is nonpositive.
+     */
+    
+    public LRUCache(int initialCapacity,int maximumCacheSize,float loadFactor, boolean accessOrder) {
+        super(initialCapacity,loadFactor,accessOrder);
+        this.maxCacheSize = maximumCacheSize;
+    }