You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2011/12/17 08:04:24 UTC
svn commit: r1215432 - in
/activemq/trunk/kahadb/src/main/java/org/apache/kahadb: page/PageFile.java
util/LFUCache.java
Author: rajdavies
Date: Sat Dec 17 07:04:24 2011
New Revision: 1215432
URL: http://svn.apache.org/viewvc?rev=1215432&view=rev
Log:
Fix for https://issues.apache.org/jira/browse/AMQ-3638
Added:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/LFUCache.java
Modified:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=1215432&r1=1215431&r2=1215432&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java Sat Dec 17 07:04:24 2011
@@ -16,44 +16,56 @@
*/
package org.apache.kahadb.page;
-import org.apache.kahadb.util.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
import java.io.*;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.Adler32;
import java.util.zip.Checksum;
+import org.apache.kahadb.util.DataByteArrayOutputStream;
+import org.apache.kahadb.util.IOExceptionSupport;
+import org.apache.kahadb.util.IOHelper;
+import org.apache.kahadb.util.IntrospectionSupport;
+import org.apache.kahadb.util.LFUCache;
+import org.apache.kahadb.util.LRUCache;
+import org.apache.kahadb.util.Sequence;
+import org.apache.kahadb.util.SequenceSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
- * A PageFile provides you random access to fixed sized disk pages. This object is not thread safe and therefore access to it should
+ * A PageFile provides you random access to fixed sized disk pages. This object is not thread safe and therefore access to it should
* be externally synchronized.
- *
+ * <p/>
* The file has 3 parts:
* Metadata Space: 4k : Reserved metadata area. Used to store persistent config about the file.
* Recovery Buffer Space: Page Size * 1000 : This is a redo log used to prevent partial page writes from making the file inconsistent
* Page Space: The pages in the page file.
- *
- *
*/
public class PageFile {
-
+
private static final String PAGEFILE_SUFFIX = ".data";
private static final String RECOVERY_FILE_SUFFIX = ".redo";
private static final String FREE_FILE_SUFFIX = ".free";
-
+
// 4k Default page size.
- public static final int DEFAULT_PAGE_SIZE = Integer.parseInt(System.getProperty("defaultPageSize", ""+1024*4));
- public static final int DEFAULT_WRITE_BATCH_SIZE = Integer.parseInt(System.getProperty("defaultWriteBatchSize", ""+1000));
- public static final int DEFAULT_PAGE_CACHE_SIZE = Integer.parseInt(System.getProperty("defaultPageCacheSize", ""+100));;
- private static final int RECOVERY_FILE_HEADER_SIZE=1024*4;
- private static final int PAGE_FILE_HEADER_SIZE=1024*4;
+ public static final int DEFAULT_PAGE_SIZE = Integer.parseInt(System.getProperty("defaultPageSize", "" + 1024 * 4));
+ public static final int DEFAULT_WRITE_BATCH_SIZE = Integer.parseInt(System.getProperty("defaultWriteBatchSize", "" + 1000));
+ public static final int DEFAULT_PAGE_CACHE_SIZE = Integer.parseInt(System.getProperty("defaultPageCacheSize", "" + 100));
+ ;
+ private static final int RECOVERY_FILE_HEADER_SIZE = 1024 * 4;
+ private static final int PAGE_FILE_HEADER_SIZE = 1024 * 4;
// Recovery header is (long offset)
private static final Logger LOG = LoggerFactory.getLogger(PageFile.class);
@@ -62,7 +74,7 @@ public class PageFile {
private File directory;
// And the file names in that directory will be based on this name.
private final String name;
-
+
// File handle used for reading pages..
private RandomAccessFile readFile;
// File handle used for writing pages..
@@ -72,7 +84,7 @@ public class PageFile {
// The size of pages
private int pageSize = DEFAULT_PAGE_SIZE;
-
+
// The minimum number of space allocated to the recovery file in number of pages.
private int recoveryFileMinPageCount = 1000;
// The max size that we let the recovery file grow to.. ma exceed the max, but the file will get resize
@@ -89,17 +101,17 @@ public class PageFile {
// We keep a cache of pages recently used?
private Map<Long, Page> pageCache;
// The cache of recently used pages.
- private boolean enablePageCaching=true;
+ private boolean enablePageCaching = true;
// How many pages will we keep in the cache?
private int pageCacheSize = DEFAULT_PAGE_CACHE_SIZE;
// Should first log the page write to the recovery buffer? Avoids partial
// page write failures..
- private boolean enableRecoveryFile=true;
+ private boolean enableRecoveryFile = true;
// Will we sync writes to disk. Ensures that data will not be lost after a checkpoint()
- private boolean enableDiskSyncs=true;
+ private boolean enableDiskSyncs = true;
// Will writes be done in an async thread?
- private boolean enabledWriteThread=false;
+ private boolean enabledWriteThread = false;
// These are used if enableAsyncWrites==true
private AtomicBoolean stopWriter = new AtomicBoolean();
@@ -107,19 +119,22 @@ public class PageFile {
private CountDownLatch checkpointLatch;
// Keeps track of writes that are being written to disk.
- private TreeMap<Long, PageWrite> writes=new TreeMap<Long, PageWrite>();
+ private TreeMap<Long, PageWrite> writes = new TreeMap<Long, PageWrite>();
// Keeps track of free pages.
private final AtomicLong nextFreePageId = new AtomicLong();
private SequenceSet freeList = new SequenceSet();
-
+
private AtomicLong nextTxid = new AtomicLong();
-
+
// Persistent settings stored in the page file.
private MetaData metaData;
private ArrayList<File> tmpFilesForRemoval = new ArrayList<File>();
+ private boolean useLFRUEviction = false;
+ private float LFUEvictionFactor = 0.2f;
+
/**
* Use to keep track of updated pages which have not yet been committed.
*/
@@ -133,8 +148,8 @@ public class PageFile {
int length;
public PageWrite(Page page, byte[] data) {
- this.page=page;
- current=data;
+ this.page = page;
+ current = data;
}
public PageWrite(Page page, long currentLocation, int length, File tmpFile) {
@@ -143,10 +158,10 @@ public class PageFile {
this.tmpFile = tmpFile;
this.length = length;
}
-
+
public void setCurrent(Page page, byte[] data) {
- this.page=page;
- current=data;
+ this.page = page;
+ current = data;
currentLocation = -1;
diskBoundLocation = -1;
}
@@ -160,7 +175,7 @@ public class PageFile {
@Override
public String toString() {
- return "[PageWrite:"+page.getPageId()+ "-" + page.getType() + "]";
+ return "[PageWrite:" + page.getPageId() + "-" + page.getType() + "]";
}
@SuppressWarnings("unchecked")
@@ -179,87 +194,100 @@ public class PageFile {
}
return diskBound;
}
-
+
void begin() {
- if (currentLocation != -1) {
- diskBoundLocation = currentLocation;
- currentLocation = -1;
- current = null;
- } else {
- diskBound = current;
- current = null;
- currentLocation = -1;
- }
+ if (currentLocation != -1) {
+ diskBoundLocation = currentLocation;
+ currentLocation = -1;
+ current = null;
+ } else {
+ diskBound = current;
+ current = null;
+ currentLocation = -1;
+ }
}
-
+
/**
* @return true if there is no pending writes to do.
*/
boolean done() {
diskBoundLocation = -1;
- diskBound=null;
+ diskBound = null;
return current == null || currentLocation == -1;
}
-
+
boolean isDone() {
return diskBound == null && diskBoundLocation == -1 && current == null && currentLocation == -1;
}
}
-
+
/**
- * The MetaData object hold the persistent data associated with a PageFile object.
+ * The MetaData object hold the persistent data associated with a PageFile object.
*/
public static class MetaData {
-
+
String fileType;
String fileTypeVersion;
-
- long metaDataTxId=-1;
+
+ long metaDataTxId = -1;
int pageSize;
boolean cleanShutdown;
long lastTxId;
long freePages;
-
+
public String getFileType() {
return fileType;
}
+
public void setFileType(String fileType) {
this.fileType = fileType;
}
+
public String getFileTypeVersion() {
return fileTypeVersion;
}
+
public void setFileTypeVersion(String version) {
this.fileTypeVersion = version;
}
+
public long getMetaDataTxId() {
return metaDataTxId;
}
+
public void setMetaDataTxId(long metaDataTxId) {
this.metaDataTxId = metaDataTxId;
}
+
public int getPageSize() {
return pageSize;
}
+
public void setPageSize(int pageSize) {
this.pageSize = pageSize;
}
+
public boolean isCleanShutdown() {
return cleanShutdown;
}
+
public void setCleanShutdown(boolean cleanShutdown) {
this.cleanShutdown = cleanShutdown;
}
+
public long getLastTxId() {
return lastTxId;
}
+
public void setLastTxId(long lastTxId) {
this.lastTxId = lastTxId;
}
+
public long getFreePages() {
return freePages;
}
+
public void setFreePages(long value) {
this.freePages = value;
}
@@ -269,37 +297,32 @@ public class PageFile {
assertLoaded();
return new Transaction(this);
}
-
+
/**
* Creates a PageFile in the specified directory who's data files are named by name.
- *
- * @param directory
- * @param name
*/
public PageFile(File directory, String name) {
this.directory = directory;
this.name = name;
}
-
+
/**
* Deletes the files used by the PageFile object. This method can only be used when this object is not loaded.
- *
- * @throws IOException
- * if the files cannot be deleted.
- * @throws IllegalStateException
- * if this PageFile is loaded
+ *
+ * @throws IOException if the files cannot be deleted.
+ * @throws IllegalStateException if this PageFile is loaded
*/
public void delete() throws IOException {
- if( loaded.get() ) {
+ if (loaded.get()) {
throw new IllegalStateException("Cannot delete page file data when the page file is loaded");
}
delete(getMainPageFile());
delete(getFreeFile());
delete(getRecoveryFile());
}
-
+
public void archive() throws IOException {
- if( loaded.get() ) {
+ if (loaded.get()) {
throw new IllegalStateException("Cannot delete page file data when the page file is loaded");
}
long timestamp = System.currentTimeMillis();
@@ -313,44 +336,46 @@ public class PageFile {
* @throws IOException
*/
private void delete(File file) throws IOException {
- if( file.exists() ) {
- if( !file.delete() ) {
- throw new IOException("Could not delete: "+file.getPath());
+ if (file.exists()) {
+ if (!file.delete()) {
+ throw new IOException("Could not delete: " + file.getPath());
}
}
}
-
+
private void archive(File file, String suffix) throws IOException {
- if( file.exists() ) {
+ if (file.exists()) {
File archive = new File(file.getPath() + "-" + suffix);
- if( !file.renameTo(archive) ) {
+ if (!file.renameTo(archive)) {
throw new IOException("Could not archive: " + file.getPath() + " to " + file.getPath());
}
}
}
-
+
/**
- * Loads the page file so that it can be accessed for read/write purposes. This allocates OS resources. If this is the
+ * Loads the page file so that it can be accessed for read/write purposes. This allocates OS resources. If this is the
* first time the page file is loaded, then this creates the page file in the file system.
- *
- * @throws IOException
- * If the page file cannot be loaded. This could be cause the existing page file is corrupt is a bad version or if
- * there was a disk error.
- * @throws IllegalStateException
- * If the page file was already loaded.
+ *
+ * @throws IOException If the page file cannot be loaded. This could be cause the existing page file is corrupt is a bad version or if
+ * there was a disk error.
+ * @throws IllegalStateException If the page file was already loaded.
*/
public void load() throws IOException, IllegalStateException {
if (loaded.compareAndSet(false, true)) {
-
- if( enablePageCaching ) {
- pageCache = Collections.synchronizedMap(new LRUCache<Long, Page>(pageCacheSize, pageCacheSize, 0.75f, true));
+
+ if (enablePageCaching) {
+ if (isUseLFRUEviction()) {
+ pageCache = Collections.synchronizedMap(new LFUCache<Long, Page>(pageCacheSize, getLFUEvictionFactor()));
+ } else {
+ pageCache = Collections.synchronizedMap(new LRUCache<Long, Page>(pageCacheSize, pageCacheSize, 0.75f, true));
+ }
}
-
+
File file = getMainPageFile();
IOHelper.mkdirs(file.getParentFile());
writeFile = new RandomAccessFile(file, "rw");
readFile = new RandomAccessFile(file, "r");
-
+
if (readFile.length() > 0) {
// Load the page size setting cause that can't change once the file is created.
loadMetaData();
@@ -367,40 +392,40 @@ public class PageFile {
storeMetaData();
}
- if( enableRecoveryFile ) {
+ if (enableRecoveryFile) {
recoveryFile = new RandomAccessFile(getRecoveryFile(), "rw");
}
-
- if( metaData.isCleanShutdown() ) {
- nextTxid.set(metaData.getLastTxId()+1);
- if( metaData.getFreePages()>0 ) {
+
+ if (metaData.isCleanShutdown()) {
+ nextTxid.set(metaData.getLastTxId() + 1);
+ if (metaData.getFreePages() > 0) {
loadFreeList();
- }
+ }
} else {
LOG.debug(toString() + ", Recovering page file...");
nextTxid.set(redoRecoveryUpdates());
-
+
// Scan all to find the free pages.
freeList = new SequenceSet();
- for (Iterator i = tx().iterator(true); i.hasNext();) {
- Page page = (Page)i.next();
- if( page.getType() == Page.PAGE_FREE_TYPE ) {
+ for (Iterator i = tx().iterator(true); i.hasNext(); ) {
+ Page page = (Page) i.next();
+ if (page.getType() == Page.PAGE_FREE_TYPE) {
freeList.add(page.getPageId());
}
}
-
+
}
-
+
metaData.setCleanShutdown(false);
storeMetaData();
getFreeFile().delete();
-
- if( writeFile.length() < PAGE_FILE_HEADER_SIZE) {
+
+ if (writeFile.length() < PAGE_FILE_HEADER_SIZE) {
writeFile.setLength(PAGE_FILE_HEADER_SIZE);
}
- nextFreePageId.set((writeFile.length()-PAGE_FILE_HEADER_SIZE)/pageSize);
+ nextFreePageId.set((writeFile.length() - PAGE_FILE_HEADER_SIZE) / pageSize);
startWriter();
-
+
} else {
throw new IllegalStateException("Cannot load the page file when it is allready loaded.");
}
@@ -410,11 +435,9 @@ public class PageFile {
/**
* Unloads a previously loaded PageFile. This deallocates OS related resources like file handles.
* once unloaded, you can no longer use the page file to read or write Pages.
- *
- * @throws IOException
- * if there was a disk error occurred while closing the down the page file.
- * @throws IllegalStateException
- * if the PageFile is not loaded
+ *
+ * @throws IOException if there was a disk error occurred while closing the down the page file.
+ * @throws IllegalStateException if the PageFile is not loaded
*/
public void unload() throws IOException {
if (loaded.compareAndSet(true, false)) {
@@ -424,32 +447,32 @@ public class PageFile {
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
-
- if( freeList.isEmpty() ) {
+
+ if (freeList.isEmpty()) {
metaData.setFreePages(0);
} else {
storeFreeList();
metaData.setFreePages(freeList.size());
}
-
- metaData.setLastTxId( nextTxid.get()-1 );
+
+ metaData.setLastTxId(nextTxid.get() - 1);
metaData.setCleanShutdown(true);
storeMetaData();
-
+
if (readFile != null) {
readFile.close();
readFile = null;
writeFile.close();
- writeFile=null;
- if( enableRecoveryFile ) {
+ writeFile = null;
+ if (enableRecoveryFile) {
recoveryFile.close();
- recoveryFile=null;
+ recoveryFile = null;
}
freeList.clear();
- if( pageCache!=null ) {
- pageCache=null;
+ if (pageCache != null) {
+ pageCache = null;
}
- synchronized(writes) {
+ synchronized (writes) {
writes.clear();
}
}
@@ -457,31 +480,30 @@ public class PageFile {
throw new IllegalStateException("Cannot unload the page file when it is not loaded");
}
}
-
+
public boolean isLoaded() {
return loaded.get();
}
/**
* Flush and sync all write buffers to disk.
- *
- * @throws IOException
- * If an disk error occurred.
+ *
+ * @throws IOException If an disk error occurred.
*/
public void flush() throws IOException {
- if( enabledWriteThread && stopWriter.get() ) {
+ if (enabledWriteThread && stopWriter.get()) {
throw new IOException("Page file already stopped: checkpointing is not allowed");
}
-
+
// Setup a latch that gets notified when all buffered writes hits the disk.
CountDownLatch checkpointLatch;
- synchronized( writes ) {
- if( writes.isEmpty()) {
+ synchronized (writes) {
+ if (writes.isEmpty()) {
return;
}
- if( enabledWriteThread ) {
- if( this.checkpointLatch == null ) {
+ if (enabledWriteThread) {
+ if (this.checkpointLatch == null) {
this.checkpointLatch = new CountDownLatch(1);
}
checkpointLatch = this.checkpointLatch;
@@ -498,28 +520,28 @@ public class PageFile {
}
}
-
+
public String toString() {
- return "Page File: "+getMainPageFile();
+ return "Page File: " + getMainPageFile();
}
-
+
///////////////////////////////////////////////////////////////////
// Private Implementation Methods
///////////////////////////////////////////////////////////////////
private File getMainPageFile() {
- return new File(directory, IOHelper.toFileSystemSafeName(name)+PAGEFILE_SUFFIX);
+ return new File(directory, IOHelper.toFileSystemSafeName(name) + PAGEFILE_SUFFIX);
}
-
+
public File getFreeFile() {
- return new File(directory, IOHelper.toFileSystemSafeName(name)+FREE_FILE_SUFFIX);
- }
+ return new File(directory, IOHelper.toFileSystemSafeName(name) + FREE_FILE_SUFFIX);
+ }
public File getRecoveryFile() {
- return new File(directory, IOHelper.toFileSystemSafeName(name)+RECOVERY_FILE_SUFFIX);
- }
+ return new File(directory, IOHelper.toFileSystemSafeName(name) + RECOVERY_FILE_SUFFIX);
+ }
public long toOffset(long pageId) {
- return PAGE_FILE_HEADER_SIZE+(pageId*pageSize);
+ return PAGE_FILE_HEADER_SIZE + (pageId * pageSize);
}
private void loadMetaData() throws IOException {
@@ -529,7 +551,7 @@ public class PageFile {
MetaData v2 = new MetaData();
try {
Properties p = new Properties();
- byte[] d = new byte[PAGE_FILE_HEADER_SIZE/2];
+ byte[] d = new byte[PAGE_FILE_HEADER_SIZE / 2];
readFile.seek(0);
readFile.readFully(d);
is = new ByteArrayInputStream(d);
@@ -538,11 +560,11 @@ public class PageFile {
} catch (IOException e) {
v1 = null;
}
-
+
try {
Properties p = new Properties();
- byte[] d = new byte[PAGE_FILE_HEADER_SIZE/2];
- readFile.seek(PAGE_FILE_HEADER_SIZE/2);
+ byte[] d = new byte[PAGE_FILE_HEADER_SIZE / 2];
+ readFile.seek(PAGE_FILE_HEADER_SIZE / 2);
readFile.readFully(d);
is = new ByteArrayInputStream(d);
p.load(is);
@@ -550,46 +572,46 @@ public class PageFile {
} catch (IOException e) {
v2 = null;
}
-
- if( v1==null && v2==null ) {
+
+ if (v1 == null && v2 == null) {
throw new IOException("Could not load page file meta data");
- }
-
- if( v1 == null || v1.metaDataTxId<0 ) {
+ }
+
+ if (v1 == null || v1.metaDataTxId < 0) {
metaData = v2;
- } else if( v2==null || v1.metaDataTxId<0 ) {
+ } else if (v2 == null || v1.metaDataTxId < 0) {
metaData = v1;
- } else if( v1.metaDataTxId==v2.metaDataTxId ) {
+ } else if (v1.metaDataTxId == v2.metaDataTxId) {
metaData = v1; // use the first since the 2nd could be a partial..
} else {
metaData = v2; // use the second cause the first is probably a partial.
}
}
-
+
private void storeMetaData() throws IOException {
// Convert the metadata into a property format
metaData.metaDataTxId++;
Properties p = new Properties();
IntrospectionSupport.getProperties(metaData, p, null);
-
+
ByteArrayOutputStream os = new ByteArrayOutputStream(PAGE_FILE_HEADER_SIZE);
p.store(os, "");
- if( os.size() > PAGE_FILE_HEADER_SIZE/2) {
- throw new IOException("Configuation is to larger than: "+PAGE_FILE_HEADER_SIZE/2);
+ if (os.size() > PAGE_FILE_HEADER_SIZE / 2) {
+ throw new IOException("Configuation is to larger than: " + PAGE_FILE_HEADER_SIZE / 2);
}
// Fill the rest with space...
- byte[] filler = new byte[(PAGE_FILE_HEADER_SIZE/2)-os.size()];
- Arrays.fill(filler, (byte)' ');
+ byte[] filler = new byte[(PAGE_FILE_HEADER_SIZE / 2) - os.size()];
+ Arrays.fill(filler, (byte) ' ');
os.write(filler);
os.flush();
-
+
byte[] d = os.toByteArray();
// So we don't loose it.. write it 2 times...
writeFile.seek(0);
writeFile.write(d);
writeFile.getFD().sync();
- writeFile.seek(PAGE_FILE_HEADER_SIZE/2);
+ writeFile.seek(PAGE_FILE_HEADER_SIZE / 2);
writeFile.write(d);
writeFile.getFD().sync();
}
@@ -608,14 +630,14 @@ public class PageFile {
freeList = SequenceSet.Marshaller.INSTANCE.readPayload(dis);
dis.close();
}
-
+
///////////////////////////////////////////////////////////////////
// Property Accessors
///////////////////////////////////////////////////////////////////
-
+
/**
* Is the recovery buffer used to double buffer page writes. Enabled by default.
- *
+ *
* @return is the recovery buffer enabled.
*/
public boolean isEnableRecoveryFile() {
@@ -640,13 +662,12 @@ public class PageFile {
/**
* Allows you enable syncing writes to disk.
- * @param syncWrites
*/
public void setEnableDiskSyncs(boolean syncWrites) {
assertNotLoaded();
this.enableDiskSyncs = syncWrites;
}
-
+
/**
* @return the page size
*/
@@ -658,23 +679,22 @@ public class PageFile {
* @return the amount of content data that a page can hold.
*/
public int getPageContentSize() {
- return this.pageSize-Page.PAGE_HEADER_SIZE;
+ return this.pageSize - Page.PAGE_HEADER_SIZE;
}
-
+
/**
* Configures the page size used by the page file. By default it is 4k. Once a page file is created on disk,
* subsequent loads of that file will use the original pageSize. Once the PageFile is loaded, this setting
* can no longer be changed.
- *
+ *
* @param pageSize the pageSize to set
- * @throws IllegalStateException
- * once the page file is loaded.
+ * @throws IllegalStateException once the page file is loaded.
*/
public void setPageSize(int pageSize) throws IllegalStateException {
assertNotLoaded();
this.pageSize = pageSize;
}
-
+
/**
* @return true if read page caching is enabled
*/
@@ -717,7 +737,7 @@ public class PageFile {
public long getDiskSize() throws IOException {
return toOffset(nextFreePageId.get());
}
-
+
/**
* @return the number of pages allocated in the PageFile
*/
@@ -748,16 +768,32 @@ public class PageFile {
this.recoveryFileMaxPageCount = recoveryFileMaxPageCount;
}
- public int getWriteBatchSize() {
- return writeBatchSize;
- }
+ public int getWriteBatchSize() {
+ return writeBatchSize;
+ }
- public void setWriteBatchSize(int writeBatchSize) {
+ public void setWriteBatchSize(int writeBatchSize) {
assertNotLoaded();
- this.writeBatchSize = writeBatchSize;
- }
+ this.writeBatchSize = writeBatchSize;
+ }
+
+ public float getLFUEvictionFactor() {
+ return LFUEvictionFactor;
+ }
+
+ public void setLFUEvictionFactor(float LFUEvictionFactor) {
+ this.LFUEvictionFactor = LFUEvictionFactor;
+ }
- ///////////////////////////////////////////////////////////////////
+ public boolean isUseLFRUEviction() {
+ return useLFRUEviction;
+ }
+
+ public void setUseLFRUEviction(boolean useLFRUEviction) {
+ this.useLFRUEviction = useLFRUEviction;
+ }
+
+ ///////////////////////////////////////////////////////////////////
// Package Protected Methods exposed to Transaction
///////////////////////////////////////////////////////////////////
@@ -765,25 +801,24 @@ public class PageFile {
* @throws IllegalStateException if the page file is not loaded.
*/
void assertLoaded() throws IllegalStateException {
- if( !loaded.get() ) {
+ if (!loaded.get()) {
throw new IllegalStateException("PageFile is not loaded");
}
}
+
void assertNotLoaded() throws IllegalStateException {
- if( loaded.get() ) {
+ if (loaded.get()) {
throw new IllegalStateException("PageFile is loaded");
}
}
-
- /**
+
+ /**
* Allocates a block of free pages that you can write data to.
- *
+ *
* @param count the number of sequential pages to allocate
- * @return the first page of the sequential set.
- * @throws IOException
- * If an disk error occurred.
- * @throws IllegalStateException
- * if the PageFile is not loaded
+ * @return the first page of the sequential set.
+ * @throws IOException If an disk error occurred.
+ * @throws IllegalStateException if the PageFile is not loaded
*/
<T> Page<T> allocate(int count) throws IOException {
assertLoaded();
@@ -837,17 +872,19 @@ public class PageFile {
freeList.add(pageId);
removeFromCache(pageId);
}
-
+
@SuppressWarnings("unchecked")
private <T> void write(Page<T> page, byte[] data) throws IOException {
final PageWrite write = new PageWrite(page, data);
- Entry<Long, PageWrite> entry = new Entry<Long, PageWrite>(){
+ Entry<Long, PageWrite> entry = new Entry<Long, PageWrite>() {
public Long getKey() {
return write.getPage().getPageId();
}
+
public PageWrite getValue() {
return write;
}
+
public PageWrite setValue(PageWrite value) {
return null;
}
@@ -857,9 +894,9 @@ public class PageFile {
}
void write(Collection<Map.Entry<Long, PageWrite>> updates) throws IOException {
- synchronized( writes ) {
- if( enabledWriteThread ) {
- while( writes.size() >= writeBatchSize && !stopWriter.get() ) {
+ synchronized (writes) {
+ if (enabledWriteThread) {
+ while (writes.size() >= writeBatchSize && !stopWriter.get()) {
try {
writes.wait();
} catch (InterruptedException e) {
@@ -875,7 +912,7 @@ public class PageFile {
Long key = entry.getKey();
PageWrite value = entry.getValue();
PageWrite write = writes.get(key);
- if( write==null ) {
+ if (write == null) {
writes.put(key, value);
} else {
if (value.currentLocation != -1) {
@@ -887,29 +924,29 @@ public class PageFile {
}
}
}
-
+
// Once we start approaching capacity, notify the writer to start writing
// sync immediately for long txs
- if( longTx || canStartWriteBatch() ) {
+ if (longTx || canStartWriteBatch()) {
- if( enabledWriteThread ) {
+ if (enabledWriteThread) {
writes.notify();
} else {
writeBatch();
}
}
- }
+ }
}
-
+
private boolean canStartWriteBatch() {
- int capacityUsed = ((writes.size() * 100)/writeBatchSize);
- if( enabledWriteThread ) {
+ int capacityUsed = ((writes.size() * 100) / writeBatchSize);
+ if (enabledWriteThread) {
// The constant 10 here controls how soon write batches start going to disk..
// would be nice to figure out how to auto tune that value. Make to small and
// we reduce through put because we are locking the write mutex too often doing writes
- return capacityUsed >= 10 || checkpointLatch!=null;
+ return capacityUsed >= 10 || checkpointLatch != null;
} else {
- return capacityUsed >= 80 || checkpointLatch!=null;
+ return capacityUsed >= 80 || checkpointLatch != null;
}
}
@@ -918,9 +955,9 @@ public class PageFile {
///////////////////////////////////////////////////////////////////
@SuppressWarnings("unchecked")
<T> Page<T> getFromCache(long pageId) {
- synchronized(writes) {
+ synchronized (writes) {
PageWrite pageWrite = writes.get(pageId);
- if( pageWrite != null ) {
+ if (pageWrite != null) {
return pageWrite.page;
}
}
@@ -947,22 +984,23 @@ public class PageFile {
///////////////////////////////////////////////////////////////////
// Internal Double write implementation follows...
///////////////////////////////////////////////////////////////////
+
/**
- *
+ *
*/
private void pollWrites() {
try {
- while( !stopWriter.get() ) {
+ while (!stopWriter.get()) {
// Wait for a notification...
- synchronized( writes ) {
+ synchronized (writes) {
writes.notifyAll();
-
+
// If there is not enough to write, wait for a notification...
- while( writes.isEmpty() && checkpointLatch==null && !stopWriter.get() ) {
+ while (writes.isEmpty() && checkpointLatch == null && !stopWriter.get()) {
writes.wait(100);
}
-
- if( writes.isEmpty() ) {
+
+ if (writes.isEmpty()) {
releaseCheckpointWaiter();
}
}
@@ -975,11 +1013,11 @@ public class PageFile {
}
}
- private void writeBatch() throws IOException {
+ private void writeBatch() throws IOException {
- CountDownLatch checkpointLatch;
- ArrayList<PageWrite> batch;
- synchronized( writes ) {
+ CountDownLatch checkpointLatch;
+ ArrayList<PageWrite> batch;
+ synchronized (writes) {
// If there is not enough to write, wait for a notification...
batch = new ArrayList<PageWrite>(writes.size());
@@ -997,126 +1035,125 @@ public class PageFile {
// Grab on to the existing checkpoint latch cause once we do this write we can
// release the folks that were waiting for those writes to hit disk.
checkpointLatch = this.checkpointLatch;
- this.checkpointLatch=null;
- }
+ this.checkpointLatch = null;
+ }
+
+ Checksum checksum = new Adler32();
+ recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
+ for (PageWrite w : batch) {
+ if (enableRecoveryFile) {
+ try {
+ checksum.update(w.getDiskBound(), 0, pageSize);
+ } catch (Throwable t) {
+ throw IOExceptionSupport.create("Cannot create recovery file. Reason: " + t, t);
+ }
+ recoveryFile.writeLong(w.page.getPageId());
+ recoveryFile.write(w.getDiskBound(), 0, pageSize);
+ }
+
+ writeFile.seek(toOffset(w.page.getPageId()));
+ writeFile.write(w.getDiskBound(), 0, pageSize);
+ w.done();
+ }
+
+ try {
+ if (enableRecoveryFile) {
+ // Can we shrink the recovery buffer??
+ if (recoveryPageCount > recoveryFileMaxPageCount) {
+ int t = Math.max(recoveryFileMinPageCount, batch.size());
+ recoveryFile.setLength(recoveryFileSizeForPages(t));
+ }
+
+ // Record the page writes in the recovery buffer.
+ recoveryFile.seek(0);
+ // Store the next tx id...
+ recoveryFile.writeLong(nextTxid.get());
+ // Store the checksum for thw write batch so that on recovery we
+ // know if we have a consistent
+ // write batch on disk.
+ recoveryFile.writeLong(checksum.getValue());
+ // Write the # of pages that will follow
+ recoveryFile.writeInt(batch.size());
+ }
+
+ if (enableDiskSyncs) {
+ // Sync to make sure recovery buffer writes land on disk..
+ recoveryFile.getFD().sync();
+ writeFile.getFD().sync();
+ }
+ } finally {
+ synchronized (writes) {
+ for (PageWrite w : batch) {
+ // If there are no more pending writes, then remove it from
+ // the write cache.
+ if (w.isDone()) {
+ writes.remove(w.page.getPageId());
+ if (w.tmpFile != null && tmpFilesForRemoval.contains(w.tmpFile)) {
+ if (!w.tmpFile.delete()) {
+ throw new IOException("Can't delete temporary KahaDB transaction file:" + w.tmpFile);
+ }
+ tmpFilesForRemoval.remove(w.tmpFile);
+ }
+ }
+ }
+ }
- Checksum checksum = new Adler32();
- recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
- for (PageWrite w : batch) {
- if (enableRecoveryFile) {
- try {
- checksum.update(w.getDiskBound(), 0, pageSize);
- } catch (Throwable t) {
- throw IOExceptionSupport.create("Cannot create recovery file. Reason: " + t, t);
- }
- recoveryFile.writeLong(w.page.getPageId());
- recoveryFile.write(w.getDiskBound(), 0, pageSize);
- }
-
- writeFile.seek(toOffset(w.page.getPageId()));
- writeFile.write(w.getDiskBound(), 0, pageSize);
- w.done();
- }
-
- try {
- if (enableRecoveryFile) {
- // Can we shrink the recovery buffer??
- if (recoveryPageCount > recoveryFileMaxPageCount) {
- int t = Math.max(recoveryFileMinPageCount, batch.size());
- recoveryFile.setLength(recoveryFileSizeForPages(t));
- }
-
- // Record the page writes in the recovery buffer.
- recoveryFile.seek(0);
- // Store the next tx id...
- recoveryFile.writeLong(nextTxid.get());
- // Store the checksum for thw write batch so that on recovery we
- // know if we have a consistent
- // write batch on disk.
- recoveryFile.writeLong(checksum.getValue());
- // Write the # of pages that will follow
- recoveryFile.writeInt(batch.size());
- }
-
- if (enableDiskSyncs) {
- // Sync to make sure recovery buffer writes land on disk..
- recoveryFile.getFD().sync();
- writeFile.getFD().sync();
- }
- } finally {
- synchronized (writes) {
- for (PageWrite w : batch) {
- // If there are no more pending writes, then remove it from
- // the write cache.
- if (w.isDone()) {
- writes.remove(w.page.getPageId());
- if (w.tmpFile != null && tmpFilesForRemoval.contains(w.tmpFile)) {
- if (!w.tmpFile.delete()) {
- throw new IOException("Can't delete temporary KahaDB transaction file:" + w.tmpFile);
- }
- tmpFilesForRemoval.remove(w.tmpFile);
- }
- }
- }
- }
-
- if (checkpointLatch != null) {
- checkpointLatch.countDown();
- }
- }
- }
+ if (checkpointLatch != null) {
+ checkpointLatch.countDown();
+ }
+ }
+ }
public void removeTmpFile(File file) {
tmpFilesForRemoval.add(file);
}
private long recoveryFileSizeForPages(int pageCount) {
- return RECOVERY_FILE_HEADER_SIZE+((pageSize+8)*pageCount);
+ return RECOVERY_FILE_HEADER_SIZE + ((pageSize + 8) * pageCount);
}
private void releaseCheckpointWaiter() {
- if( checkpointLatch!=null ) {
+ if (checkpointLatch != null) {
checkpointLatch.countDown();
- checkpointLatch=null;
+ checkpointLatch = null;
}
- }
-
+ }
+
/**
- * Inspects the recovery buffer and re-applies any
+ * Inspects the recovery buffer and re-applies any
* partially applied page writes.
- *
+ *
* @return the next transaction id that can be used.
- * @throws IOException
*/
private long redoRecoveryUpdates() throws IOException {
- if( !enableRecoveryFile ) {
+ if (!enableRecoveryFile) {
return 0;
}
- recoveryPageCount=0;
-
+ recoveryPageCount = 0;
+
// Are we initializing the recovery file?
- if( recoveryFile.length() == 0 ) {
+ if (recoveryFile.length() == 0) {
// Write an empty header..
recoveryFile.write(new byte[RECOVERY_FILE_HEADER_SIZE]);
// Preallocate the minium size for better performance.
recoveryFile.setLength(recoveryFileSizeForPages(recoveryFileMinPageCount));
return 0;
}
-
+
// How many recovery pages do we have in the recovery buffer?
recoveryFile.seek(0);
long nextTxId = recoveryFile.readLong();
long expectedChecksum = recoveryFile.readLong();
int pageCounter = recoveryFile.readInt();
-
+
recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
Checksum checksum = new Adler32();
LinkedHashMap<Long, byte[]> batch = new LinkedHashMap<Long, byte[]>();
try {
for (int i = 0; i < pageCounter; i++) {
long offset = recoveryFile.readLong();
- byte []data = new byte[pageSize];
- if( recoveryFile.read(data, 0, pageSize) != pageSize ) {
+ byte[] data = new byte[pageSize];
+ if (recoveryFile.read(data, 0, pageSize) != pageSize) {
// Invalid recovery record, Could not fully read the data". Probably due to a partial write to the recovery buffer
return nextTxId;
}
@@ -1129,28 +1166,28 @@ public class PageFile {
LOG.debug("Redo buffer was not fully intact: ", e);
return nextTxId;
}
-
+
recoveryPageCount = pageCounter;
-
+
// If the checksum is not valid then the recovery buffer was partially written to disk.
- if( checksum.getValue() != expectedChecksum ) {
+ if (checksum.getValue() != expectedChecksum) {
return nextTxId;
}
-
+
// Re-apply all the writes in the recovery buffer.
for (Map.Entry<Long, byte[]> e : batch.entrySet()) {
writeFile.seek(toOffset(e.getKey()));
writeFile.write(e.getValue());
}
-
+
// And sync it to disk
writeFile.getFD().sync();
return nextTxId;
}
private void startWriter() {
- synchronized( writes ) {
- if( enabledWriteThread ) {
+ synchronized (writes) {
+ if (enabledWriteThread) {
stopWriter.set(false);
writerThread = new Thread("KahaDB Page Writer") {
@Override
@@ -1164,17 +1201,17 @@ public class PageFile {
}
}
}
-
+
private void stopWriter() throws InterruptedException {
- if( enabledWriteThread ) {
+ if (enabledWriteThread) {
stopWriter.set(true);
writerThread.join();
}
}
- public File getFile() {
- return getMainPageFile();
- }
+ public File getFile() {
+ return getMainPageFile();
+ }
public File getDirectory() {
return directory;
Added: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/LFUCache.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/LFUCache.java?rev=1215432&view=auto
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/LFUCache.java (added)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/LFUCache.java Sat Dec 17 07:04:24 2011
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.util;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * LFU cache implementation based on http://dhruvbird.com/lfu.pdf, with some notable differences:
+ * <ul>
+ * <li>
+ * Frequency list is stored as an array with no next/prev pointers between nodes: looping over the array should be faster and more CPU-cache friendly than
+ * using an ad-hoc linked-pointers structure.
+ * </li>
+ * <li>
+ * The max frequency is capped at the cache size to avoid creating more and more frequency list entries, and all elements residing in the max frequency entry
+ * are re-positioned in the frequency entry linked set in order to put most recently accessed elements ahead of less recently ones,
+ * which will be collected sooner.
+ * </li>
+ * <li>
+ * The eviction factor determines how many elements (more specifically, the percentage of) will be evicted.
+ * </li>
+ * </ul>
+ * As a consequence, this cache runs in *amortized* O(1) time (considering the worst case of having the lowest frequency at 0 and having to evict all
+ * elements).
+ *
+ * @author Sergio Bossa
+ */
+public class LFUCache<Key, Value> implements Map<Key, Value> {
+
+ private final Map<Key, CacheNode<Key, Value>> cache;
+ private final LinkedHashSet[] frequencyList;
+ private int lowestFrequency;
+ private int maxFrequency;
+ //
+ private final int maxCacheSize;
+ private final float evictionFactor;
+
+ public LFUCache(int maxCacheSize, float evictionFactor) {
+ if (evictionFactor <= 0 || evictionFactor >= 1) {
+ throw new IllegalArgumentException("Eviction factor must be greater than 0 and lesser than or equal to 1");
+ }
+ this.cache = new HashMap<Key, CacheNode<Key, Value>>(maxCacheSize);
+ this.frequencyList = new LinkedHashSet[maxCacheSize];
+ this.lowestFrequency = 0;
+ this.maxFrequency = maxCacheSize - 1;
+ this.maxCacheSize = maxCacheSize;
+ this.evictionFactor = evictionFactor;
+ initFrequencyList();
+ }
+
+ public Value put(Key k, Value v) {
+ Value oldValue = null;
+ CacheNode<Key, Value> currentNode = cache.get(k);
+ if (currentNode == null) {
+ if (cache.size() == maxCacheSize) {
+ doEviction();
+ }
+ LinkedHashSet<CacheNode<Key, Value>> nodes = frequencyList[0];
+ currentNode = new CacheNode(k, v, 0);
+ nodes.add(currentNode);
+ cache.put(k, currentNode);
+ lowestFrequency = 0;
+ } else {
+ oldValue = currentNode.v;
+ currentNode.v = v;
+ }
+ return oldValue;
+ }
+
+
+ public void putAll(Map<? extends Key, ? extends Value> map) {
+ for (Map.Entry<? extends Key, ? extends Value> me : map.entrySet()) {
+ put(me.getKey(), me.getValue());
+ }
+ }
+
+ public Value get(Object k) {
+ CacheNode<Key, Value> currentNode = cache.get(k);
+ if (currentNode != null) {
+ int currentFrequency = currentNode.frequency;
+ if (currentFrequency < maxFrequency) {
+ int nextFrequency = currentFrequency + 1;
+ LinkedHashSet<CacheNode<Key, Value>> currentNodes = frequencyList[currentFrequency];
+ LinkedHashSet<CacheNode<Key, Value>> newNodes = frequencyList[nextFrequency];
+ moveToNextFrequency(currentNode, nextFrequency, currentNodes, newNodes);
+ cache.put((Key) k, currentNode);
+ if (lowestFrequency == currentFrequency && currentNodes.isEmpty()) {
+ lowestFrequency = nextFrequency;
+ }
+ } else {
+ // Hybrid with LRU: put most recently accessed ahead of others:
+ LinkedHashSet<CacheNode<Key, Value>> nodes = frequencyList[currentFrequency];
+ nodes.remove(currentNode);
+ nodes.add(currentNode);
+ }
+ return currentNode.v;
+ } else {
+ return null;
+ }
+ }
+
+ public Value remove(Object k) {
+ CacheNode<Key, Value> currentNode = cache.remove(k);
+ if (currentNode != null) {
+ LinkedHashSet<CacheNode<Key, Value>> nodes = frequencyList[currentNode.frequency];
+ nodes.remove(currentNode);
+ if (lowestFrequency == currentNode.frequency) {
+ findNextLowestFrequency();
+ }
+ return currentNode.v;
+ } else {
+ return null;
+ }
+ }
+
+ public int frequencyOf(Key k) {
+ CacheNode<Key, Value> node = cache.get(k);
+ if (node != null) {
+ return node.frequency + 1;
+ } else {
+ return 0;
+ }
+ }
+
+ public void clear() {
+ for (int i = 0; i <= maxFrequency; i++) {
+ frequencyList[i].clear();
+ }
+ cache.clear();
+ lowestFrequency = 0;
+ }
+
+ public Set<Key> keySet() {
+ return this.cache.keySet();
+ }
+
+ public Collection<Value> values() {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Set<Entry<Key, Value>> entrySet() {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public int size() {
+ return cache.size();
+ }
+
+ public boolean isEmpty() {
+ return this.cache.isEmpty();
+ }
+
+ public boolean containsKey(Object o) {
+ return this.cache.containsKey(o);
+ }
+
+ public boolean containsValue(Object o) {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+
+ private void initFrequencyList() {
+ for (int i = 0; i <= maxFrequency; i++) {
+ frequencyList[i] = new LinkedHashSet<CacheNode<Key, Value>>();
+ }
+ }
+
+ private void doEviction() {
+ int currentlyDeleted = 0;
+ float target = maxCacheSize * evictionFactor;
+ while (currentlyDeleted < target) {
+ LinkedHashSet<CacheNode<Key, Value>> nodes = frequencyList[lowestFrequency];
+ if (nodes.isEmpty()) {
+ throw new IllegalStateException("Lowest frequency constraint violated!");
+ } else {
+ Iterator<CacheNode<Key, Value>> it = nodes.iterator();
+ while (it.hasNext() && currentlyDeleted++ < target) {
+ CacheNode<Key, Value> node = it.next();
+ it.remove();
+ cache.remove(node.k);
+ }
+ if (!it.hasNext()) {
+ findNextLowestFrequency();
+ }
+ }
+ }
+ }
+
+ private void moveToNextFrequency(CacheNode<Key, Value> currentNode, int nextFrequency, LinkedHashSet<CacheNode<Key, Value>> currentNodes, LinkedHashSet<CacheNode<Key, Value>> newNodes) {
+ currentNodes.remove(currentNode);
+ newNodes.add(currentNode);
+ currentNode.frequency = nextFrequency;
+ }
+
+ private void findNextLowestFrequency() {
+ while (lowestFrequency <= maxFrequency && frequencyList[lowestFrequency].isEmpty()) {
+ lowestFrequency++;
+ }
+ if (lowestFrequency > maxFrequency) {
+ lowestFrequency = 0;
+ }
+ }
+
+ private static class CacheNode<Key, Value> {
+
+ public final Key k;
+ public Value v;
+ public int frequency;
+
+ public CacheNode(Key k, Value v, int frequency) {
+ this.k = k;
+ this.v = v;
+ this.frequency = frequency;
+ }
+
+ }
+}
\ No newline at end of file