You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/11/15 21:56:22 UTC
svn commit: r475416 - in
/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq:
kaha/impl/container/ListContainerImpl.java
store/kahadaptor/KahaMessageStore.java
store/kahadaptor/KahaPersistenceAdapter.java util/LRUCache.java
Author: rajdavies
Date: Wed Nov 15 12:56:21 2006
New Revision: 475416
URL: http://svn.apache.org/viewvc?view=rev&rev=475416
Log:
change Queue message store in Kaha store adaptor to use memory efficent list instead
of Map containers
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/LRUCache.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java?view=diff&rev=475416&r1=475415&r2=475416
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java Wed Nov 15 12:56:21 2006
@@ -47,6 +47,7 @@
protected int offset=0;
protected int maximumCacheSize=100;
protected IndexItem lastCached;
+ protected boolean cacheEnabled = true;
public ListContainerImpl(ContainerId id,IndexItem root,IndexManager indexManager,DataManager dataManager,
String indexType) throws IOException{
@@ -858,46 +859,51 @@
}
protected void itemAdded(IndexItem item,int pos,Object value){
- int cachePosition=pos-offset;
- // if pos is before the cache offset
- // we need to clear the cache
- if(pos<offset){
- clearCache();
- }
- if(cacheList.isEmpty()){
- offset=pos;
- cacheList.add(value);
- lastCached=item;
- }else if(cachePosition==cacheList.size()&&cachePosition<maximumCacheSize){
- cacheList.add(value);
- lastCached=item;
- }else if(cachePosition>=0&&cachePosition<=cacheList.size()){
- cacheList.add(cachePosition,value);
- if(cacheList.size()>maximumCacheSize){
- itemRemoved(cacheList.size()-1);
+ if(cacheEnabled){
+ int cachePosition=pos-offset;
+ // if pos is before the cache offset
+ // we need to clear the cache
+ if(pos<offset){
+ clearCache();
+ }
+ if(cacheList.isEmpty()){
+ offset=pos;
+ cacheList.add(value);
+ lastCached=item;
+ }else if(cachePosition==cacheList.size()&&cachePosition<maximumCacheSize){
+ cacheList.add(value);
+ lastCached=item;
+ }else if(cachePosition>=0&&cachePosition<=cacheList.size()){
+ cacheList.add(cachePosition,value);
+ if(cacheList.size()>maximumCacheSize){
+ itemRemoved(cacheList.size()-1);
+ }
}
}
}
protected void itemRemoved(int pos){
- int lastPosition=offset+cacheList.size()-1;
- int cachePosition=pos-offset;
- if(cachePosition>=0&&cachePosition<cacheList.size()){
- if(cachePosition==lastPosition){
- if(lastCached!=null){
- lastCached=indexList.getPrevEntry(lastCached);
+ if(cacheEnabled){
+ int lastPosition=offset+cacheList.size()-1;
+ int cachePosition=pos-offset;
+ if(cachePosition>=0&&cachePosition<cacheList.size()){
+ if(cachePosition==lastPosition){
+ if(lastCached!=null){
+ lastCached=indexList.getPrevEntry(lastCached);
+ }
+ }
+ cacheList.remove(pos);
+ if(cacheList.isEmpty()){
+ clearCache();
}
- }
- cacheList.remove(pos);
- if(cacheList.isEmpty()){
- clearCache();
}
}
}
protected Object getCachedItem(int pos){
- int cachePosition=pos-offset;
Object result=null;
+ if(cacheEnabled) {
+ int cachePosition=pos-offset;
if(cachePosition>=0&&cachePosition<cacheList.size()){
result=cacheList.get(cachePosition);
}
@@ -928,6 +934,12 @@
}
}
}
+ }else {
+ IndexItem item=indexList.get(pos);
+ if(item!=null){
+ result=getValue(item);
+ }
+ }
return result;
}
@@ -980,6 +992,10 @@
*/
public synchronized void setMaximumCacheSize(int maximumCacheSize){
this.maximumCacheSize=maximumCacheSize;
+ cacheEnabled = maximumCacheSize >= 0;
+ if (!cacheEnabled) {
+ clearCache();
+ }
}
/**
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java?view=diff&rev=475416&r1=475415&r2=475416
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java Wed Nov 15 12:56:21 2006
@@ -19,16 +19,17 @@
import java.io.IOException;
import java.util.Iterator;
-
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
-import org.apache.activemq.kaha.MapContainer;
+import org.apache.activemq.kaha.ListContainer;
+import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.util.LRUCache;
/**
* An implementation of {@link org.apache.activemq.store.MessageStore} which uses a JPS Container
*
@@ -36,50 +37,84 @@
*/
public class KahaMessageStore implements MessageStore{
protected final ActiveMQDestination destination;
- protected final MapContainer messageContainer;
+ protected final ListContainer messageContainer;
+ protected final LRUCache cache;
- public KahaMessageStore(MapContainer container,ActiveMQDestination destination) throws IOException{
+ public KahaMessageStore(ListContainer container,ActiveMQDestination destination, int maximumCacheSize) throws IOException{
this.messageContainer=container;
this.destination=destination;
+ this.cache=new LRUCache(maximumCacheSize,maximumCacheSize,0.75f,false);
+ // populate the cache
+ StoreEntry entry=messageContainer.getFirst();
+ int count = 0;
+ if(entry!=null){
+ do{
+ Message msg = (Message)messageContainer.get(entry);
+ cache.put(msg.getMessageId(),entry);
+ entry = messageContainer.getNext(entry);
+ count++;
+ }while(entry!=null && count < maximumCacheSize);
+ }
}
public Object getId(){
return messageContainer.getId();
}
- public void addMessage(ConnectionContext context,Message message) throws IOException{
- messageContainer.put(message.getMessageId().toString(),message);
+ public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
+ StoreEntry item = messageContainer.placeLast(message);
+ cache.put(message.getMessageId(),item);
}
- public void addMessageReference(ConnectionContext context,MessageId messageId,long expirationTime,String messageRef)
+ public synchronized void addMessageReference(ConnectionContext context,MessageId messageId,long expirationTime,String messageRef)
throws IOException{
- messageContainer.put(messageId.toString(),messageRef);
+ throw new RuntimeException("Not supported");
}
- public Message getMessage(MessageId identity) throws IOException{
- return (Message) messageContainer.get(identity.toString());
+ public synchronized Message getMessage(MessageId identity) throws IOException{
+ Message result=null;
+ StoreEntry entry=(StoreEntry)cache.remove(identity);
+ if(entry!=null){
+ result = (Message)messageContainer.get(entry);
+ }else{
+
+ for(Iterator i=messageContainer.iterator();i.hasNext();){
+ Message msg=(Message)i.next();
+ if(msg.getMessageId().equals(identity)){
+ result=msg;
+ break;
+ }
+ }
+ }
+ return result;
}
public String getMessageReference(MessageId identity) throws IOException{
- return (String) messageContainer.get(identity.toString());
+ return null;
}
public void removeMessage(ConnectionContext context,MessageAck ack) throws IOException{
- messageContainer.remove(ack.getLastMessageId().toString());
+ removeMessage(ack.getLastMessageId());
}
- public void removeMessage(MessageId msgId) throws IOException{
- messageContainer.remove(msgId.toString());
+ public synchronized void removeMessage(MessageId msgId) throws IOException{
+ StoreEntry entry=(StoreEntry)cache.remove(msgId);
+ if(entry!=null){
+ messageContainer.remove(entry);
+ }else{
+ for(Iterator i=messageContainer.iterator();i.hasNext();){
+ Message msg=(Message)i.next();
+ if(msg.getMessageId().equals(msgId)){
+ i.remove();
+ break;
+ }
+ }
+ }
}
- public void recover(MessageRecoveryListener listener) throws Exception{
- for(Iterator iter=messageContainer.values().iterator();iter.hasNext();){
- Object msg=(Object) iter.next();
- if(msg.getClass()==String.class){
- listener.recoverMessageReference((String) msg);
- }else{
- listener.recoverMessage((Message) msg);
- }
+ public synchronized void recover(MessageRecoveryListener listener) throws Exception{
+ for(Iterator iter=messageContainer.iterator();iter.hasNext();){
+ listener.recoverMessage((Message)iter.next());
}
listener.finished();
}
@@ -88,16 +123,18 @@
public void stop() {}
- public void removeAllMessages(ConnectionContext context) throws IOException{
+ public synchronized void removeAllMessages(ConnectionContext context) throws IOException{
messageContainer.clear();
+ cache.clear();
}
public ActiveMQDestination getDestination(){
return destination;
}
- public void delete(){
+ public synchronized void delete(){
messageContainer.clear();
+ cache.clear();
}
/**
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java?view=diff&rev=475416&r1=475415&r2=475416
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java Wed Nov 15 12:56:21 2006
@@ -1,19 +1,15 @@
/**
*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.store.kahadaptor;
@@ -59,6 +55,7 @@
private boolean useExternalMessageReferences;
private OpenWireFormat wireFormat=new OpenWireFormat();
private long maxDataFileLength=32*1024*1024;
+ private int maximumDestinationCacheSize=2000;
private String indexType=IndexTypes.DISK_INDEX;
private File dir;
private Store theStore;
@@ -68,6 +65,8 @@
dir.mkdirs();
}
this.dir=dir;
+ wireFormat.setCacheEnabled(false);
+ wireFormat.setTightEncodingEnabled(true);
}
public Set getDestinations(){
@@ -89,7 +88,7 @@
public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException{
MessageStore rc=(MessageStore)queues.get(destination);
if(rc==null){
- rc=new KahaMessageStore(getMapContainer(destination,"queue-data"),destination);
+ rc=new KahaMessageStore(getListContainer(destination,"queue-data"),destination,maximumDestinationCacheSize);
messageStores.put(destination,rc);
if(transactionStore!=null){
rc=transactionStore.proxy(rc);
@@ -185,10 +184,11 @@
container.load();
return container;
}
-
+
protected ListContainer getListContainer(Object id,String containerName) throws IOException{
Store store=getStore();
ListContainer container=store.getListContainer(id,containerName);
+ container.setMaximumCacheSize(0);
if(useExternalMessageReferences){
container.setMarshaller(new StringMarshaller());
}else{
@@ -199,9 +199,7 @@
}
/**
- * @param usageManager
- * The UsageManager that is controlling the broker's memory
- * usage.
+ * @param usageManager The UsageManager that is controlling the broker's memory usage.
*/
public void setUsageManager(UsageManager usageManager){
}
@@ -214,8 +212,7 @@
}
/**
- * @param maxDataFileLength
- * the maxDataFileLength to set
+ * @param maxDataFileLength the maxDataFileLength to set
*
* @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
*/
@@ -235,6 +232,20 @@
*/
public void setIndexType(String indexType){
this.indexType=indexType;
+ }
+
+ /**
+ * @return the maximumDestinationCacheSize
+ */
+ public int getMaximumDestinationCacheSize(){
+ return this.maximumDestinationCacheSize;
+ }
+
+ /**
+ * @param maximumDestinationCacheSize the maximumDestinationCacheSize to set
+ */
+ public void setMaximumDestinationCacheSize(int maximumDestinationCacheSize){
+ this.maximumDestinationCacheSize=maximumDestinationCacheSize;
}
protected synchronized Store getStore() throws IOException{
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/LRUCache.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/LRUCache.java?view=diff&rev=475416&r1=475415&r2=475416
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/LRUCache.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/LRUCache.java Wed Nov 15 12:56:21 2006
@@ -37,6 +37,24 @@
public LRUCache(){
super(1000,0.75f,true);
}
+
+ /**
+ * Constructs an empty <tt>LRUCache</tt> instance with the
+ * specified initial capacity, maximumCacheSize,load factor and ordering mode.
+ *
+ * @param initialCapacity the initial capacity.
+ * @param maximumCacheSize
+ * @param loadFactor the load factor.
+ * @param accessOrder the ordering mode - <tt>true</tt> for
+ * access-order, <tt>false</tt> for insertion-order.
+ * @throws IllegalArgumentException if the initial capacity is negative
+ * or the load factor is nonpositive.
+ */
+
+ public LRUCache(int initialCapacity,int maximumCacheSize,float loadFactor, boolean accessOrder) {
+ super(initialCapacity,loadFactor,accessOrder);
+ this.maxCacheSize = maximumCacheSize;
+ }