You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2011/12/17 08:04:24 UTC

svn commit: r1215432 - in /activemq/trunk/kahadb/src/main/java/org/apache/kahadb: page/PageFile.java util/LFUCache.java

Author: rajdavies
Date: Sat Dec 17 07:04:24 2011
New Revision: 1215432

URL: http://svn.apache.org/viewvc?rev=1215432&view=rev
Log:
Fix for https://issues.apache.org/jira/browse/AMQ-3638

Added:
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/LFUCache.java
Modified:
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=1215432&r1=1215431&r2=1215432&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java Sat Dec 17 07:04:24 2011
@@ -16,44 +16,56 @@
  */
 package org.apache.kahadb.page;
 
-import org.apache.kahadb.util.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.*;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.TreeMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.zip.Adler32;
 import java.util.zip.Checksum;
 
+import org.apache.kahadb.util.DataByteArrayOutputStream;
+import org.apache.kahadb.util.IOExceptionSupport;
+import org.apache.kahadb.util.IOHelper;
+import org.apache.kahadb.util.IntrospectionSupport;
+import org.apache.kahadb.util.LFUCache;
+import org.apache.kahadb.util.LRUCache;
+import org.apache.kahadb.util.Sequence;
+import org.apache.kahadb.util.SequenceSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
- * A PageFile provides you random access to fixed sized disk pages. This object is not thread safe and therefore access to it should 
+ * A PageFile provides you random access to fixed sized disk pages. This object is not thread safe and therefore access to it should
  * be externally synchronized.
- * 
+ * <p/>
  * The file has 3 parts:
  * Metadata Space: 4k : Reserved metadata area. Used to store persistent config about the file.
  * Recovery Buffer Space: Page Size * 1000 : This is a redo log used to prevent partial page writes from making the file inconsistent
  * Page Space: The pages in the page file.
- * 
- * 
  */
 public class PageFile {
-    
+
     private static final String PAGEFILE_SUFFIX = ".data";
     private static final String RECOVERY_FILE_SUFFIX = ".redo";
     private static final String FREE_FILE_SUFFIX = ".free";
-    
+
     // 4k Default page size.
-    public static final int DEFAULT_PAGE_SIZE = Integer.parseInt(System.getProperty("defaultPageSize", ""+1024*4)); 
-    public static final int DEFAULT_WRITE_BATCH_SIZE = Integer.parseInt(System.getProperty("defaultWriteBatchSize", ""+1000));
-    public static final int DEFAULT_PAGE_CACHE_SIZE = Integer.parseInt(System.getProperty("defaultPageCacheSize", ""+100));;
-    private static final int RECOVERY_FILE_HEADER_SIZE=1024*4;
-    private static final int PAGE_FILE_HEADER_SIZE=1024*4;
+    public static final int DEFAULT_PAGE_SIZE = Integer.parseInt(System.getProperty("defaultPageSize", "" + 1024 * 4));
+    public static final int DEFAULT_WRITE_BATCH_SIZE = Integer.parseInt(System.getProperty("defaultWriteBatchSize", "" + 1000));
+    public static final int DEFAULT_PAGE_CACHE_SIZE = Integer.parseInt(System.getProperty("defaultPageCacheSize", "" + 100));
+    ;
+    private static final int RECOVERY_FILE_HEADER_SIZE = 1024 * 4;
+    private static final int PAGE_FILE_HEADER_SIZE = 1024 * 4;
 
     // Recovery header is (long offset)
     private static final Logger LOG = LoggerFactory.getLogger(PageFile.class);
@@ -62,7 +74,7 @@ public class PageFile {
     private File directory;
     // And the file names in that directory will be based on this name.
     private final String name;
-    
+
     // File handle used for reading pages..
     private RandomAccessFile readFile;
     // File handle used for writing pages..
@@ -72,7 +84,7 @@ public class PageFile {
 
     // The size of pages
     private int pageSize = DEFAULT_PAGE_SIZE;
-    
+
     // The minimum number of space allocated to the recovery file in number of pages.
     private int recoveryFileMinPageCount = 1000;
     // The max size that we let the recovery file grow to.. ma exceed the max, but the file will get resize 
@@ -89,17 +101,17 @@ public class PageFile {
     // We keep a cache of pages recently used?
     private Map<Long, Page> pageCache;
     // The cache of recently used pages.
-    private boolean enablePageCaching=true;
+    private boolean enablePageCaching = true;
     // How many pages will we keep in the cache?
     private int pageCacheSize = DEFAULT_PAGE_CACHE_SIZE;
 
     // Should first log the page write to the recovery buffer? Avoids partial
     // page write failures..
-    private boolean enableRecoveryFile=true;
+    private boolean enableRecoveryFile = true;
     // Will we sync writes to disk. Ensures that data will not be lost after a checkpoint()
-    private boolean enableDiskSyncs=true;
+    private boolean enableDiskSyncs = true;
     // Will writes be done in an async thread?
-    private boolean enabledWriteThread=false;
+    private boolean enabledWriteThread = false;
 
     // These are used if enableAsyncWrites==true 
     private AtomicBoolean stopWriter = new AtomicBoolean();
@@ -107,19 +119,22 @@ public class PageFile {
     private CountDownLatch checkpointLatch;
 
     // Keeps track of writes that are being written to disk.
-    private TreeMap<Long, PageWrite> writes=new TreeMap<Long, PageWrite>();
+    private TreeMap<Long, PageWrite> writes = new TreeMap<Long, PageWrite>();
 
     // Keeps track of free pages.
     private final AtomicLong nextFreePageId = new AtomicLong();
     private SequenceSet freeList = new SequenceSet();
-    
+
     private AtomicLong nextTxid = new AtomicLong();
-    
+
     // Persistent settings stored in the page file. 
     private MetaData metaData;
 
     private ArrayList<File> tmpFilesForRemoval = new ArrayList<File>();
 
+    private boolean useLFRUEviction = false;
+    private float LFUEvictionFactor = 0.2f;
+
     /**
      * Use to keep track of updated pages which have not yet been committed.
      */
@@ -133,8 +148,8 @@ public class PageFile {
         int length;
 
         public PageWrite(Page page, byte[] data) {
-            this.page=page;
-            current=data;
+            this.page = page;
+            current = data;
         }
 
         public PageWrite(Page page, long currentLocation, int length, File tmpFile) {
@@ -143,10 +158,10 @@ public class PageFile {
             this.tmpFile = tmpFile;
             this.length = length;
         }
-                
+
         public void setCurrent(Page page, byte[] data) {
-            this.page=page;
-            current=data;
+            this.page = page;
+            current = data;
             currentLocation = -1;
             diskBoundLocation = -1;
         }
@@ -160,7 +175,7 @@ public class PageFile {
 
         @Override
         public String toString() {
-            return "[PageWrite:"+page.getPageId()+ "-" + page.getType()  + "]";
+            return "[PageWrite:" + page.getPageId() + "-" + page.getType() + "]";
         }
 
         @SuppressWarnings("unchecked")
@@ -179,87 +194,100 @@ public class PageFile {
             }
             return diskBound;
         }
-        
+
         void begin() {
-           if (currentLocation != -1) {
-              diskBoundLocation = currentLocation;
-              currentLocation = -1;
-              current = null;
-           }  else {
-              diskBound = current;
-              current = null;
-              currentLocation = -1;
-           }
+            if (currentLocation != -1) {
+                diskBoundLocation = currentLocation;
+                currentLocation = -1;
+                current = null;
+            } else {
+                diskBound = current;
+                current = null;
+                currentLocation = -1;
+            }
         }
-        
+
         /**
          * @return true if there is no pending writes to do.
          */
         boolean done() {
             diskBoundLocation = -1;
-            diskBound=null;
+            diskBound = null;
             return current == null || currentLocation == -1;
         }
-        
+
         boolean isDone() {
             return diskBound == null && diskBoundLocation == -1 && current == null && currentLocation == -1;
         }
 
     }
-    
+
     /**
-     * The MetaData object hold the persistent data associated with a PageFile object. 
+     * The MetaData object hold the persistent data associated with a PageFile object.
      */
     public static class MetaData {
-        
+
         String fileType;
         String fileTypeVersion;
-        
-        long metaDataTxId=-1;
+
+        long metaDataTxId = -1;
         int pageSize;
         boolean cleanShutdown;
         long lastTxId;
         long freePages;
-        
+
         public String getFileType() {
             return fileType;
         }
+
         public void setFileType(String fileType) {
             this.fileType = fileType;
         }
+
         public String getFileTypeVersion() {
             return fileTypeVersion;
         }
+
         public void setFileTypeVersion(String version) {
             this.fileTypeVersion = version;
         }
+
         public long getMetaDataTxId() {
             return metaDataTxId;
         }
+
         public void setMetaDataTxId(long metaDataTxId) {
             this.metaDataTxId = metaDataTxId;
         }
+
         public int getPageSize() {
             return pageSize;
         }
+
         public void setPageSize(int pageSize) {
             this.pageSize = pageSize;
         }
+
         public boolean isCleanShutdown() {
             return cleanShutdown;
         }
+
         public void setCleanShutdown(boolean cleanShutdown) {
             this.cleanShutdown = cleanShutdown;
         }
+
         public long getLastTxId() {
             return lastTxId;
         }
+
         public void setLastTxId(long lastTxId) {
             this.lastTxId = lastTxId;
         }
+
         public long getFreePages() {
             return freePages;
         }
+
         public void setFreePages(long value) {
             this.freePages = value;
         }
@@ -269,37 +297,32 @@ public class PageFile {
         assertLoaded();
         return new Transaction(this);
     }
-    
+
     /**
      * Creates a PageFile in the specified directory who's data files are named by name.
-     * 
-     * @param directory
-     * @param name
      */
     public PageFile(File directory, String name) {
         this.directory = directory;
         this.name = name;
     }
-    
+
     /**
      * Deletes the files used by the PageFile object.  This method can only be used when this object is not loaded.
-     * 
-     * @throws IOException 
-     *         if the files cannot be deleted.
-     * @throws IllegalStateException 
-     *         if this PageFile is loaded
+     *
+     * @throws IOException           if the files cannot be deleted.
+     * @throws IllegalStateException if this PageFile is loaded
      */
     public void delete() throws IOException {
-        if( loaded.get() ) {
+        if (loaded.get()) {
             throw new IllegalStateException("Cannot delete page file data when the page file is loaded");
         }
         delete(getMainPageFile());
         delete(getFreeFile());
         delete(getRecoveryFile());
     }
-    
+
     public void archive() throws IOException {
-        if( loaded.get() ) {
+        if (loaded.get()) {
             throw new IllegalStateException("Cannot delete page file data when the page file is loaded");
         }
         long timestamp = System.currentTimeMillis();
@@ -313,44 +336,46 @@ public class PageFile {
      * @throws IOException
      */
     private void delete(File file) throws IOException {
-        if( file.exists() ) {
-            if( !file.delete() ) {
-                throw new IOException("Could not delete: "+file.getPath());
+        if (file.exists()) {
+            if (!file.delete()) {
+                throw new IOException("Could not delete: " + file.getPath());
             }
         }
     }
-    
+
     private void archive(File file, String suffix) throws IOException {
-        if( file.exists() ) {
+        if (file.exists()) {
             File archive = new File(file.getPath() + "-" + suffix);
-            if( !file.renameTo(archive) ) {
+            if (!file.renameTo(archive)) {
                 throw new IOException("Could not archive: " + file.getPath() + " to " + file.getPath());
             }
         }
     }
-    
+
     /**
-     * Loads the page file so that it can be accessed for read/write purposes.  This allocates OS resources.  If this is the 
+     * Loads the page file so that it can be accessed for read/write purposes.  This allocates OS resources.  If this is the
      * first time the page file is loaded, then this creates the page file in the file system.
-     * 
-     * @throws IOException
-     *         If the page file cannot be loaded. This could be cause the existing page file is corrupt is a bad version or if 
-     *         there was a disk error.
-     * @throws IllegalStateException 
-     *         If the page file was already loaded.
+     *
+     * @throws IOException           If the page file cannot be loaded. This could be cause the existing page file is corrupt is a bad version or if
+     *                               there was a disk error.
+     * @throws IllegalStateException If the page file was already loaded.
      */
     public void load() throws IOException, IllegalStateException {
         if (loaded.compareAndSet(false, true)) {
-            
-            if( enablePageCaching ) {
-                pageCache = Collections.synchronizedMap(new LRUCache<Long, Page>(pageCacheSize, pageCacheSize, 0.75f, true));
+
+            if (enablePageCaching) {
+                if (isUseLFRUEviction()) {
+                    pageCache = Collections.synchronizedMap(new LFUCache<Long, Page>(pageCacheSize, getLFUEvictionFactor()));
+                } else {
+                    pageCache = Collections.synchronizedMap(new LRUCache<Long, Page>(pageCacheSize, pageCacheSize, 0.75f, true));
+                }
             }
-            
+
             File file = getMainPageFile();
             IOHelper.mkdirs(file.getParentFile());
             writeFile = new RandomAccessFile(file, "rw");
             readFile = new RandomAccessFile(file, "r");
-            
+
             if (readFile.length() > 0) {
                 // Load the page size setting cause that can't change once the file is created.
                 loadMetaData();
@@ -367,40 +392,40 @@ public class PageFile {
                 storeMetaData();
             }
 
-            if( enableRecoveryFile ) {
+            if (enableRecoveryFile) {
                 recoveryFile = new RandomAccessFile(getRecoveryFile(), "rw");
             }
-            
-            if(  metaData.isCleanShutdown() ) {
-                nextTxid.set(metaData.getLastTxId()+1);
-                if( metaData.getFreePages()>0 ) {
+
+            if (metaData.isCleanShutdown()) {
+                nextTxid.set(metaData.getLastTxId() + 1);
+                if (metaData.getFreePages() > 0) {
                     loadFreeList();
-                } 
+                }
             } else {
                 LOG.debug(toString() + ", Recovering page file...");
                 nextTxid.set(redoRecoveryUpdates());
-                
+
                 // Scan all to find the free pages.
                 freeList = new SequenceSet();
-                for (Iterator i = tx().iterator(true); i.hasNext();) {
-                    Page page = (Page)i.next();
-                    if( page.getType() == Page.PAGE_FREE_TYPE ) {
+                for (Iterator i = tx().iterator(true); i.hasNext(); ) {
+                    Page page = (Page) i.next();
+                    if (page.getType() == Page.PAGE_FREE_TYPE) {
                         freeList.add(page.getPageId());
                     }
                 }
-                
+
             }
-            
+
             metaData.setCleanShutdown(false);
             storeMetaData();
             getFreeFile().delete();
-            
-            if( writeFile.length() < PAGE_FILE_HEADER_SIZE) {
+
+            if (writeFile.length() < PAGE_FILE_HEADER_SIZE) {
                 writeFile.setLength(PAGE_FILE_HEADER_SIZE);
             }
-            nextFreePageId.set((writeFile.length()-PAGE_FILE_HEADER_SIZE)/pageSize);
+            nextFreePageId.set((writeFile.length() - PAGE_FILE_HEADER_SIZE) / pageSize);
             startWriter();
-                
+
         } else {
             throw new IllegalStateException("Cannot load the page file when it is allready loaded.");
         }
@@ -410,11 +435,9 @@ public class PageFile {
     /**
      * Unloads a previously loaded PageFile.  This deallocates OS related resources like file handles.
      * once unloaded, you can no longer use the page file to read or write Pages.
-     * 
-     * @throws IOException
-     *         if there was a disk error occurred while closing the down the page file.
-     * @throws IllegalStateException
-     *         if the PageFile is not loaded
+     *
+     * @throws IOException           if there was a disk error occurred while closing the down the page file.
+     * @throws IllegalStateException if the PageFile is not loaded
      */
     public void unload() throws IOException {
         if (loaded.compareAndSet(true, false)) {
@@ -424,32 +447,32 @@ public class PageFile {
             } catch (InterruptedException e) {
                 throw new InterruptedIOException();
             }
-            
-            if( freeList.isEmpty() ) {
+
+            if (freeList.isEmpty()) {
                 metaData.setFreePages(0);
             } else {
                 storeFreeList();
                 metaData.setFreePages(freeList.size());
             }
-            
-            metaData.setLastTxId( nextTxid.get()-1 );
+
+            metaData.setLastTxId(nextTxid.get() - 1);
             metaData.setCleanShutdown(true);
             storeMetaData();
-            
+
             if (readFile != null) {
                 readFile.close();
                 readFile = null;
                 writeFile.close();
-                writeFile=null;
-                if( enableRecoveryFile ) {
+                writeFile = null;
+                if (enableRecoveryFile) {
                     recoveryFile.close();
-                    recoveryFile=null;
+                    recoveryFile = null;
                 }
                 freeList.clear();
-                if( pageCache!=null ) {
-                    pageCache=null;
+                if (pageCache != null) {
+                    pageCache = null;
                 }
-                synchronized(writes) {
+                synchronized (writes) {
                     writes.clear();
                 }
             }
@@ -457,31 +480,30 @@ public class PageFile {
             throw new IllegalStateException("Cannot unload the page file when it is not loaded");
         }
     }
-        
+
     public boolean isLoaded() {
         return loaded.get();
     }
 
     /**
      * Flush and sync all write buffers to disk.
-     * 
-     * @throws IOException
-     *         If an disk error occurred.
+     *
+     * @throws IOException If an disk error occurred.
      */
     public void flush() throws IOException {
 
-        if( enabledWriteThread && stopWriter.get() ) {
+        if (enabledWriteThread && stopWriter.get()) {
             throw new IOException("Page file already stopped: checkpointing is not allowed");
         }
-        
+
         // Setup a latch that gets notified when all buffered writes hits the disk.
         CountDownLatch checkpointLatch;
-        synchronized( writes ) {
-            if( writes.isEmpty()) {                
+        synchronized (writes) {
+            if (writes.isEmpty()) {
                 return;
             }
-            if( enabledWriteThread ) {
-                if( this.checkpointLatch == null ) {
+            if (enabledWriteThread) {
+                if (this.checkpointLatch == null) {
                     this.checkpointLatch = new CountDownLatch(1);
                 }
                 checkpointLatch = this.checkpointLatch;
@@ -498,28 +520,28 @@ public class PageFile {
         }
     }
 
-    
+
     public String toString() {
-        return "Page File: "+getMainPageFile();
+        return "Page File: " + getMainPageFile();
     }
-    
+
     ///////////////////////////////////////////////////////////////////
     // Private Implementation Methods
     ///////////////////////////////////////////////////////////////////
     private File getMainPageFile() {
-        return new File(directory, IOHelper.toFileSystemSafeName(name)+PAGEFILE_SUFFIX);
+        return new File(directory, IOHelper.toFileSystemSafeName(name) + PAGEFILE_SUFFIX);
     }
-    
+
     public File getFreeFile() {
-        return new File(directory, IOHelper.toFileSystemSafeName(name)+FREE_FILE_SUFFIX);
-    } 
+        return new File(directory, IOHelper.toFileSystemSafeName(name) + FREE_FILE_SUFFIX);
+    }
 
     public File getRecoveryFile() {
-        return new File(directory, IOHelper.toFileSystemSafeName(name)+RECOVERY_FILE_SUFFIX);
-    } 
+        return new File(directory, IOHelper.toFileSystemSafeName(name) + RECOVERY_FILE_SUFFIX);
+    }
 
     public long toOffset(long pageId) {
-        return PAGE_FILE_HEADER_SIZE+(pageId*pageSize);
+        return PAGE_FILE_HEADER_SIZE + (pageId * pageSize);
     }
 
     private void loadMetaData() throws IOException {
@@ -529,7 +551,7 @@ public class PageFile {
         MetaData v2 = new MetaData();
         try {
             Properties p = new Properties();
-            byte[] d = new byte[PAGE_FILE_HEADER_SIZE/2];
+            byte[] d = new byte[PAGE_FILE_HEADER_SIZE / 2];
             readFile.seek(0);
             readFile.readFully(d);
             is = new ByteArrayInputStream(d);
@@ -538,11 +560,11 @@ public class PageFile {
         } catch (IOException e) {
             v1 = null;
         }
-        
+
         try {
             Properties p = new Properties();
-            byte[] d = new byte[PAGE_FILE_HEADER_SIZE/2];
-            readFile.seek(PAGE_FILE_HEADER_SIZE/2);
+            byte[] d = new byte[PAGE_FILE_HEADER_SIZE / 2];
+            readFile.seek(PAGE_FILE_HEADER_SIZE / 2);
             readFile.readFully(d);
             is = new ByteArrayInputStream(d);
             p.load(is);
@@ -550,46 +572,46 @@ public class PageFile {
         } catch (IOException e) {
             v2 = null;
         }
-        
-        if( v1==null && v2==null ) {
+
+        if (v1 == null && v2 == null) {
             throw new IOException("Could not load page file meta data");
-        } 
-        
-        if( v1 == null || v1.metaDataTxId<0 ) {
+        }
+
+        if (v1 == null || v1.metaDataTxId < 0) {
             metaData = v2;
-        } else if( v2==null || v1.metaDataTxId<0 ) {
+        } else if (v2 == null || v1.metaDataTxId < 0) {
             metaData = v1;
-        } else if( v1.metaDataTxId==v2.metaDataTxId ) {
+        } else if (v1.metaDataTxId == v2.metaDataTxId) {
             metaData = v1; // use the first since the 2nd could be a partial..
         } else {
             metaData = v2; // use the second cause the first is probably a partial.
         }
     }
-    
+
     private void storeMetaData() throws IOException {
         // Convert the metadata into a property format
         metaData.metaDataTxId++;
         Properties p = new Properties();
         IntrospectionSupport.getProperties(metaData, p, null);
-        
+
         ByteArrayOutputStream os = new ByteArrayOutputStream(PAGE_FILE_HEADER_SIZE);
         p.store(os, "");
-        if( os.size() > PAGE_FILE_HEADER_SIZE/2) { 
-            throw new IOException("Configuation is to larger than: "+PAGE_FILE_HEADER_SIZE/2);
+        if (os.size() > PAGE_FILE_HEADER_SIZE / 2) {
+            throw new IOException("Configuation is to larger than: " + PAGE_FILE_HEADER_SIZE / 2);
         }
         // Fill the rest with space...
-        byte[] filler = new byte[(PAGE_FILE_HEADER_SIZE/2)-os.size()];
-        Arrays.fill(filler, (byte)' ');
+        byte[] filler = new byte[(PAGE_FILE_HEADER_SIZE / 2) - os.size()];
+        Arrays.fill(filler, (byte) ' ');
         os.write(filler);
         os.flush();
-        
+
         byte[] d = os.toByteArray();
 
         // So we don't loose it.. write it 2 times...
         writeFile.seek(0);
         writeFile.write(d);
         writeFile.getFD().sync();
-        writeFile.seek(PAGE_FILE_HEADER_SIZE/2);
+        writeFile.seek(PAGE_FILE_HEADER_SIZE / 2);
         writeFile.write(d);
         writeFile.getFD().sync();
     }
@@ -608,14 +630,14 @@ public class PageFile {
         freeList = SequenceSet.Marshaller.INSTANCE.readPayload(dis);
         dis.close();
     }
-    
+
     ///////////////////////////////////////////////////////////////////
     // Property Accessors 
     ///////////////////////////////////////////////////////////////////
-    
+
     /**
      * Is the recovery buffer used to double buffer page writes.  Enabled by default.
-     * 
+     *
      * @return is the recovery buffer enabled.
      */
     public boolean isEnableRecoveryFile() {
@@ -640,13 +662,12 @@ public class PageFile {
 
     /**
      * Allows you enable syncing writes to disk.
-     * @param syncWrites
      */
     public void setEnableDiskSyncs(boolean syncWrites) {
         assertNotLoaded();
         this.enableDiskSyncs = syncWrites;
     }
-    
+
     /**
      * @return the page size
      */
@@ -658,23 +679,22 @@ public class PageFile {
      * @return the amount of content data that a page can hold.
      */
     public int getPageContentSize() {
-        return this.pageSize-Page.PAGE_HEADER_SIZE;
+        return this.pageSize - Page.PAGE_HEADER_SIZE;
     }
-    
+
     /**
      * Configures the page size used by the page file.  By default it is 4k.  Once a page file is created on disk,
      * subsequent loads of that file will use the original pageSize.  Once the PageFile is loaded, this setting
      * can no longer be changed.
-     * 
+     *
      * @param pageSize the pageSize to set
-     * @throws IllegalStateException
-     *         once the page file is loaded.
+     * @throws IllegalStateException once the page file is loaded.
      */
     public void setPageSize(int pageSize) throws IllegalStateException {
         assertNotLoaded();
         this.pageSize = pageSize;
     }
-    
+
     /**
      * @return true if read page caching is enabled
      */
@@ -717,7 +737,7 @@ public class PageFile {
     public long getDiskSize() throws IOException {
         return toOffset(nextFreePageId.get());
     }
-    
+
     /**
      * @return the number of pages allocated in the PageFile
      */
@@ -748,16 +768,32 @@ public class PageFile {
         this.recoveryFileMaxPageCount = recoveryFileMaxPageCount;
     }
 
-	public int getWriteBatchSize() {
-		return writeBatchSize;
-	}
+    public int getWriteBatchSize() {
+        return writeBatchSize;
+    }
 
-	public void setWriteBatchSize(int writeBatchSize) {
+    public void setWriteBatchSize(int writeBatchSize) {
         assertNotLoaded();
-		this.writeBatchSize = writeBatchSize;
-	}
+        this.writeBatchSize = writeBatchSize;
+    }
+
+    public float getLFUEvictionFactor() {
+        return LFUEvictionFactor;
+    }
+
+    public void setLFUEvictionFactor(float LFUEvictionFactor) {
+        this.LFUEvictionFactor = LFUEvictionFactor;
+    }
 
-	///////////////////////////////////////////////////////////////////
+    public boolean isUseLFRUEviction() {
+        return useLFRUEviction;
+    }
+
+    public void setUseLFRUEviction(boolean useLFRUEviction) {
+        this.useLFRUEviction = useLFRUEviction;
+    }
+
+    ///////////////////////////////////////////////////////////////////
     // Package Protected Methods exposed to Transaction
     ///////////////////////////////////////////////////////////////////
 
@@ -765,25 +801,24 @@ public class PageFile {
      * @throws IllegalStateException if the page file is not loaded.
      */
     void assertLoaded() throws IllegalStateException {
-        if( !loaded.get() ) {
+        if (!loaded.get()) {
             throw new IllegalStateException("PageFile is not loaded");
         }
     }
+
     void assertNotLoaded() throws IllegalStateException {
-        if( loaded.get() ) {
+        if (loaded.get()) {
             throw new IllegalStateException("PageFile is loaded");
         }
     }
-        
-    /** 
+
+    /**
      * Allocates a block of free pages that you can write data to.
-     * 
+     *
      * @param count the number of sequential pages to allocate
-     * @return the first page of the sequential set. 
-     * @throws IOException
-     *         If an disk error occurred.
-     * @throws IllegalStateException
-     *         if the PageFile is not loaded
+     * @return the first page of the sequential set.
+     * @throws IOException           If an disk error occurred.
+     * @throws IllegalStateException if the PageFile is not loaded
      */
     <T> Page<T> allocate(int count) throws IOException {
         assertLoaded();
@@ -837,17 +872,19 @@ public class PageFile {
         freeList.add(pageId);
         removeFromCache(pageId);
     }
-    
+
     @SuppressWarnings("unchecked")
     private <T> void write(Page<T> page, byte[] data) throws IOException {
         final PageWrite write = new PageWrite(page, data);
-        Entry<Long, PageWrite> entry = new Entry<Long, PageWrite>(){
+        Entry<Long, PageWrite> entry = new Entry<Long, PageWrite>() {
             public Long getKey() {
                 return write.getPage().getPageId();
             }
+
             public PageWrite getValue() {
                 return write;
             }
+
             public PageWrite setValue(PageWrite value) {
                 return null;
             }
@@ -857,9 +894,9 @@ public class PageFile {
     }
 
     void write(Collection<Map.Entry<Long, PageWrite>> updates) throws IOException {
-        synchronized( writes ) {
-            if( enabledWriteThread  ) {
-                while( writes.size() >= writeBatchSize && !stopWriter.get() ) {
+        synchronized (writes) {
+            if (enabledWriteThread) {
+                while (writes.size() >= writeBatchSize && !stopWriter.get()) {
                     try {
                         writes.wait();
                     } catch (InterruptedException e) {
@@ -875,7 +912,7 @@ public class PageFile {
                 Long key = entry.getKey();
                 PageWrite value = entry.getValue();
                 PageWrite write = writes.get(key);
-                if( write==null ) {
+                if (write == null) {
                     writes.put(key, value);
                 } else {
                     if (value.currentLocation != -1) {
@@ -887,29 +924,29 @@ public class PageFile {
                     }
                 }
             }
-            
+
             // Once we start approaching capacity, notify the writer to start writing
             // sync immediately for long txs
-            if( longTx || canStartWriteBatch() ) {
+            if (longTx || canStartWriteBatch()) {
 
-                if( enabledWriteThread  ) {
+                if (enabledWriteThread) {
                     writes.notify();
                 } else {
                     writeBatch();
                 }
             }
-        }            
+        }
     }
-    
+
     private boolean canStartWriteBatch() {
-		int capacityUsed = ((writes.size() * 100)/writeBatchSize);
-        if( enabledWriteThread ) {
+        int capacityUsed = ((writes.size() * 100) / writeBatchSize);
+        if (enabledWriteThread) {
             // The constant 10 here controls how soon write batches start going to disk..
             // would be nice to figure out how to auto tune that value.  Make to small and
             // we reduce through put because we are locking the write mutex too often doing writes
-            return capacityUsed >= 10 || checkpointLatch!=null;
+            return capacityUsed >= 10 || checkpointLatch != null;
         } else {
-            return capacityUsed >= 80 || checkpointLatch!=null;
+            return capacityUsed >= 80 || checkpointLatch != null;
         }
     }
 
@@ -918,9 +955,9 @@ public class PageFile {
     ///////////////////////////////////////////////////////////////////
     @SuppressWarnings("unchecked")
     <T> Page<T> getFromCache(long pageId) {
-        synchronized(writes) {
+        synchronized (writes) {
             PageWrite pageWrite = writes.get(pageId);
-            if( pageWrite != null ) {
+            if (pageWrite != null) {
                 return pageWrite.page;
             }
         }
@@ -947,22 +984,23 @@ public class PageFile {
     ///////////////////////////////////////////////////////////////////
     // Internal Double write implementation follows...
     ///////////////////////////////////////////////////////////////////
+
     /**
-     * 
+     *
      */
     private void pollWrites() {
         try {
-            while( !stopWriter.get() ) {
+            while (!stopWriter.get()) {
                 // Wait for a notification...
-                synchronized( writes ) {  
+                synchronized (writes) {
                     writes.notifyAll();
-                    
+
                     // If there is not enough to write, wait for a notification...
-                    while( writes.isEmpty() && checkpointLatch==null && !stopWriter.get() ) {
+                    while (writes.isEmpty() && checkpointLatch == null && !stopWriter.get()) {
                         writes.wait(100);
                     }
-                    
-                    if( writes.isEmpty() ) {
+
+                    if (writes.isEmpty()) {
                         releaseCheckpointWaiter();
                     }
                 }
@@ -975,11 +1013,11 @@ public class PageFile {
         }
     }
 
-     private void writeBatch() throws IOException {
+    private void writeBatch() throws IOException {
 
-         CountDownLatch checkpointLatch;
-         ArrayList<PageWrite> batch;
-         synchronized( writes ) {
+        CountDownLatch checkpointLatch;
+        ArrayList<PageWrite> batch;
+        synchronized (writes) {
             // If there is not enough to write, wait for a notification...
 
             batch = new ArrayList<PageWrite>(writes.size());
@@ -997,126 +1035,125 @@ public class PageFile {
             // Grab on to the existing checkpoint latch cause once we do this write we can
             // release the folks that were waiting for those writes to hit disk.
             checkpointLatch = this.checkpointLatch;
-            this.checkpointLatch=null;
-         }
+            this.checkpointLatch = null;
+        }
+
+        Checksum checksum = new Adler32();
+        recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
+        for (PageWrite w : batch) {
+            if (enableRecoveryFile) {
+                try {
+                    checksum.update(w.getDiskBound(), 0, pageSize);
+                } catch (Throwable t) {
+                    throw IOExceptionSupport.create("Cannot create recovery file. Reason: " + t, t);
+                }
+                recoveryFile.writeLong(w.page.getPageId());
+                recoveryFile.write(w.getDiskBound(), 0, pageSize);
+            }
+
+            writeFile.seek(toOffset(w.page.getPageId()));
+            writeFile.write(w.getDiskBound(), 0, pageSize);
+            w.done();
+        }
+
+        try {
+            if (enableRecoveryFile) {
+                // Can we shrink the recovery buffer??
+                if (recoveryPageCount > recoveryFileMaxPageCount) {
+                    int t = Math.max(recoveryFileMinPageCount, batch.size());
+                    recoveryFile.setLength(recoveryFileSizeForPages(t));
+                }
+
+                // Record the page writes in the recovery buffer.
+                recoveryFile.seek(0);
+                // Store the next tx id...
+                recoveryFile.writeLong(nextTxid.get());
+                // Store the checksum for thw write batch so that on recovery we
+                // know if we have a consistent
+                // write batch on disk.
+                recoveryFile.writeLong(checksum.getValue());
+                // Write the # of pages that will follow
+                recoveryFile.writeInt(batch.size());
+            }
+
+            if (enableDiskSyncs) {
+                // Sync to make sure recovery buffer writes land on disk..
+                recoveryFile.getFD().sync();
+                writeFile.getFD().sync();
+            }
+        } finally {
+            synchronized (writes) {
+                for (PageWrite w : batch) {
+                    // If there are no more pending writes, then remove it from
+                    // the write cache.
+                    if (w.isDone()) {
+                        writes.remove(w.page.getPageId());
+                        if (w.tmpFile != null && tmpFilesForRemoval.contains(w.tmpFile)) {
+                            if (!w.tmpFile.delete()) {
+                                throw new IOException("Can't delete temporary KahaDB transaction file:" + w.tmpFile);
+                            }
+                            tmpFilesForRemoval.remove(w.tmpFile);
+                        }
+                    }
+                }
+            }
 
-         Checksum checksum = new Adler32();
-         recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
-         for (PageWrite w : batch) {
-             if (enableRecoveryFile) {
-                 try {
-                     checksum.update(w.getDiskBound(), 0, pageSize);
-                 } catch (Throwable t) {
-                     throw IOExceptionSupport.create("Cannot create recovery file. Reason: " + t, t);
-                 }
-                 recoveryFile.writeLong(w.page.getPageId());
-                 recoveryFile.write(w.getDiskBound(), 0, pageSize);
-             }
-
-             writeFile.seek(toOffset(w.page.getPageId()));
-             writeFile.write(w.getDiskBound(), 0, pageSize);
-             w.done();
-         }
-
-         try {
-             if (enableRecoveryFile) {
-                 // Can we shrink the recovery buffer??
-                 if (recoveryPageCount > recoveryFileMaxPageCount) {
-                     int t = Math.max(recoveryFileMinPageCount, batch.size());
-                     recoveryFile.setLength(recoveryFileSizeForPages(t));
-                 }
-
-                 // Record the page writes in the recovery buffer.
-                 recoveryFile.seek(0);
-                 // Store the next tx id...
-                 recoveryFile.writeLong(nextTxid.get());
-                 // Store the checksum for thw write batch so that on recovery we
-                 // know if we have a consistent
-                 // write batch on disk.
-                 recoveryFile.writeLong(checksum.getValue());
-                 // Write the # of pages that will follow
-                 recoveryFile.writeInt(batch.size());
-             }
-
-             if (enableDiskSyncs) {
-                 // Sync to make sure recovery buffer writes land on disk..
-                 recoveryFile.getFD().sync();
-                 writeFile.getFD().sync();
-             }
-         } finally {
-             synchronized (writes) {
-                 for (PageWrite w : batch) {
-                     // If there are no more pending writes, then remove it from
-                     // the write cache.
-                     if (w.isDone()) {
-                         writes.remove(w.page.getPageId());
-                         if (w.tmpFile != null && tmpFilesForRemoval.contains(w.tmpFile)) {
-                             if (!w.tmpFile.delete()) {
-                                 throw new IOException("Can't delete temporary KahaDB transaction file:" + w.tmpFile);
-                             }
-                             tmpFilesForRemoval.remove(w.tmpFile);
-                         }
-                     }
-                 }
-             }
-
-             if (checkpointLatch != null) {
-                 checkpointLatch.countDown();
-             }
-         }
-     }
+            if (checkpointLatch != null) {
+                checkpointLatch.countDown();
+            }
+        }
+    }
 
     public void removeTmpFile(File file) {
         tmpFilesForRemoval.add(file);
     }
 
     private long recoveryFileSizeForPages(int pageCount) {
-        return RECOVERY_FILE_HEADER_SIZE+((pageSize+8)*pageCount);
+        return RECOVERY_FILE_HEADER_SIZE + ((pageSize + 8) * pageCount);
     }
 
     private void releaseCheckpointWaiter() {
-        if( checkpointLatch!=null ) {
+        if (checkpointLatch != null) {
             checkpointLatch.countDown();
-            checkpointLatch=null;
+            checkpointLatch = null;
         }
-    }       
-    
+    }
+
     /**
-     * Inspects the recovery buffer and re-applies any 
+     * Inspects the recovery buffer and re-applies any
      * partially applied page writes.
-     * 
+     *
      * @return the next transaction id that can be used.
-     * @throws IOException
      */
     private long redoRecoveryUpdates() throws IOException {
-        if( !enableRecoveryFile ) {
+        if (!enableRecoveryFile) {
             return 0;
         }
-        recoveryPageCount=0;
-        
+        recoveryPageCount = 0;
+
         // Are we initializing the recovery file?
-        if( recoveryFile.length() == 0 ) {
+        if (recoveryFile.length() == 0) {
             // Write an empty header..
             recoveryFile.write(new byte[RECOVERY_FILE_HEADER_SIZE]);
             // Preallocate the minium size for better performance.
             recoveryFile.setLength(recoveryFileSizeForPages(recoveryFileMinPageCount));
             return 0;
         }
-        
+
         // How many recovery pages do we have in the recovery buffer?
         recoveryFile.seek(0);
         long nextTxId = recoveryFile.readLong();
         long expectedChecksum = recoveryFile.readLong();
         int pageCounter = recoveryFile.readInt();
-        
+
         recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
         Checksum checksum = new Adler32();
         LinkedHashMap<Long, byte[]> batch = new LinkedHashMap<Long, byte[]>();
         try {
             for (int i = 0; i < pageCounter; i++) {
                 long offset = recoveryFile.readLong();
-                byte []data = new byte[pageSize];
-                if( recoveryFile.read(data, 0, pageSize) != pageSize ) {
+                byte[] data = new byte[pageSize];
+                if (recoveryFile.read(data, 0, pageSize) != pageSize) {
                     // Invalid recovery record, Could not fully read the data". Probably due to a partial write to the recovery buffer
                     return nextTxId;
                 }
@@ -1129,28 +1166,28 @@ public class PageFile {
             LOG.debug("Redo buffer was not fully intact: ", e);
             return nextTxId;
         }
-        
+
         recoveryPageCount = pageCounter;
-        
+
         // If the checksum is not valid then the recovery buffer was partially written to disk.
-        if( checksum.getValue() != expectedChecksum ) {
+        if (checksum.getValue() != expectedChecksum) {
             return nextTxId;
         }
-        
+
         // Re-apply all the writes in the recovery buffer.
         for (Map.Entry<Long, byte[]> e : batch.entrySet()) {
             writeFile.seek(toOffset(e.getKey()));
             writeFile.write(e.getValue());
         }
-        
+
         // And sync it to disk
         writeFile.getFD().sync();
         return nextTxId;
     }
 
     private void startWriter() {
-        synchronized( writes ) {
-            if( enabledWriteThread ) {
+        synchronized (writes) {
+            if (enabledWriteThread) {
                 stopWriter.set(false);
                 writerThread = new Thread("KahaDB Page Writer") {
                     @Override
@@ -1164,17 +1201,17 @@ public class PageFile {
             }
         }
     }
- 
+
     private void stopWriter() throws InterruptedException {
-        if( enabledWriteThread ) {
+        if (enabledWriteThread) {
             stopWriter.set(true);
             writerThread.join();
         }
     }
 
-	public File getFile() {
-		return getMainPageFile();
-	}
+    public File getFile() {
+        return getMainPageFile();
+    }
 
     public File getDirectory() {
         return directory;

Added: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/LFUCache.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/LFUCache.java?rev=1215432&view=auto
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/LFUCache.java (added)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/LFUCache.java Sat Dec 17 07:04:24 2011
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.util;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * LFU cache implementation based on http://dhruvbird.com/lfu.pdf, with some notable differences:
+ * <ul>
+ * <li>
+ * Frequency list is stored as an array with no next/prev pointers between nodes: looping over the array should be faster and more CPU-cache friendly than
+ * using an ad-hoc linked-pointers structure.
+ * </li>
+ * <li>
+ * The max frequency is capped at the cache size to avoid creating more and more frequency list entries, and all elements residing in the max frequency entry
+ * are re-positioned in the frequency entry linked set in order to put most recently accessed elements ahead of less recently ones,
+ * which will be collected sooner.
+ * </li>
+ * <li>
+ * The eviction factor determines how many elements (more specifically, the percentage of) will be evicted.
+ * </li>
+ * </ul>
+ * As a consequence, this cache runs in *amortized* O(1) time (considering the worst case of having the lowest frequency at 0 and having to evict all
+ * elements).
+ *
+ * @author Sergio Bossa
+ */
+public class LFUCache<Key, Value> implements Map<Key, Value> {
+
+    private final Map<Key, CacheNode<Key, Value>> cache;
+    private final LinkedHashSet[] frequencyList;
+    private int lowestFrequency;
+    private int maxFrequency;
+    //
+    private final int maxCacheSize;
+    private final float evictionFactor;
+
+    public LFUCache(int maxCacheSize, float evictionFactor) {
+        if (evictionFactor <= 0 || evictionFactor >= 1) {
+            throw new IllegalArgumentException("Eviction factor must be greater than 0 and lesser than or equal to 1");
+        }
+        this.cache = new HashMap<Key, CacheNode<Key, Value>>(maxCacheSize);
+        this.frequencyList = new LinkedHashSet[maxCacheSize];
+        this.lowestFrequency = 0;
+        this.maxFrequency = maxCacheSize - 1;
+        this.maxCacheSize = maxCacheSize;
+        this.evictionFactor = evictionFactor;
+        initFrequencyList();
+    }
+
+    public Value put(Key k, Value v) {
+        Value oldValue = null;
+        CacheNode<Key, Value> currentNode = cache.get(k);
+        if (currentNode == null) {
+            if (cache.size() == maxCacheSize) {
+                doEviction();
+            }
+            LinkedHashSet<CacheNode<Key, Value>> nodes = frequencyList[0];
+            currentNode = new CacheNode(k, v, 0);
+            nodes.add(currentNode);
+            cache.put(k, currentNode);
+            lowestFrequency = 0;
+        } else {
+            oldValue = currentNode.v;
+            currentNode.v = v;
+        }
+        return oldValue;
+    }
+
+
+    public void putAll(Map<? extends Key, ? extends Value> map) {
+        for (Map.Entry<? extends Key, ? extends Value> me : map.entrySet()) {
+            put(me.getKey(), me.getValue());
+        }
+    }
+
+    public Value get(Object k) {
+        CacheNode<Key, Value> currentNode = cache.get(k);
+        if (currentNode != null) {
+            int currentFrequency = currentNode.frequency;
+            if (currentFrequency < maxFrequency) {
+                int nextFrequency = currentFrequency + 1;
+                LinkedHashSet<CacheNode<Key, Value>> currentNodes = frequencyList[currentFrequency];
+                LinkedHashSet<CacheNode<Key, Value>> newNodes = frequencyList[nextFrequency];
+                moveToNextFrequency(currentNode, nextFrequency, currentNodes, newNodes);
+                cache.put((Key) k, currentNode);
+                if (lowestFrequency == currentFrequency && currentNodes.isEmpty()) {
+                    lowestFrequency = nextFrequency;
+                }
+            } else {
+                // Hybrid with LRU: put most recently accessed ahead of others:
+                LinkedHashSet<CacheNode<Key, Value>> nodes = frequencyList[currentFrequency];
+                nodes.remove(currentNode);
+                nodes.add(currentNode);
+            }
+            return currentNode.v;
+        } else {
+            return null;
+        }
+    }
+
+    public Value remove(Object k) {
+        CacheNode<Key, Value> currentNode = cache.remove(k);
+        if (currentNode != null) {
+            LinkedHashSet<CacheNode<Key, Value>> nodes = frequencyList[currentNode.frequency];
+            nodes.remove(currentNode);
+            if (lowestFrequency == currentNode.frequency) {
+                findNextLowestFrequency();
+            }
+            return currentNode.v;
+        } else {
+            return null;
+        }
+    }
+
+    public int frequencyOf(Key k) {
+        CacheNode<Key, Value> node = cache.get(k);
+        if (node != null) {
+            return node.frequency + 1;
+        } else {
+            return 0;
+        }
+    }
+
+    public void clear() {
+        for (int i = 0; i <= maxFrequency; i++) {
+            frequencyList[i].clear();
+        }
+        cache.clear();
+        lowestFrequency = 0;
+    }
+
+    public Set<Key> keySet() {
+        return this.cache.keySet();
+    }
+
+    public Collection<Value> values() {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public Set<Entry<Key, Value>> entrySet() {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public int size() {
+        return cache.size();
+    }
+
+    public boolean isEmpty() {
+        return this.cache.isEmpty();
+    }
+
+    public boolean containsKey(Object o) {
+        return this.cache.containsKey(o);
+    }
+
+    public boolean containsValue(Object o) {
+        return false;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+
+    private void initFrequencyList() {
+        for (int i = 0; i <= maxFrequency; i++) {
+            frequencyList[i] = new LinkedHashSet<CacheNode<Key, Value>>();
+        }
+    }
+
+    private void doEviction() {
+        int currentlyDeleted = 0;
+        float target = maxCacheSize * evictionFactor;
+        while (currentlyDeleted < target) {
+            LinkedHashSet<CacheNode<Key, Value>> nodes = frequencyList[lowestFrequency];
+            if (nodes.isEmpty()) {
+                throw new IllegalStateException("Lowest frequency constraint violated!");
+            } else {
+                Iterator<CacheNode<Key, Value>> it = nodes.iterator();
+                while (it.hasNext() && currentlyDeleted++ < target) {
+                    CacheNode<Key, Value> node = it.next();
+                    it.remove();
+                    cache.remove(node.k);
+                }
+                if (!it.hasNext()) {
+                    findNextLowestFrequency();
+                }
+            }
+        }
+    }
+
+    private void moveToNextFrequency(CacheNode<Key, Value> currentNode, int nextFrequency, LinkedHashSet<CacheNode<Key, Value>> currentNodes, LinkedHashSet<CacheNode<Key, Value>> newNodes) {
+        currentNodes.remove(currentNode);
+        newNodes.add(currentNode);
+        currentNode.frequency = nextFrequency;
+    }
+
+    private void findNextLowestFrequency() {
+        while (lowestFrequency <= maxFrequency && frequencyList[lowestFrequency].isEmpty()) {
+            lowestFrequency++;
+        }
+        if (lowestFrequency > maxFrequency) {
+            lowestFrequency = 0;
+        }
+    }
+
+    private static class CacheNode<Key, Value> {
+
+        public final Key k;
+        public Value v;
+        public int frequency;
+
+        public CacheNode(Key k, Value v, int frequency) {
+            this.k = k;
+            this.v = v;
+            this.frequency = frequency;
+        }
+
+    }
+}
\ No newline at end of file