You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by sa...@apache.org on 2011/09/09 14:29:08 UTC
svn commit: r1167130 - in /directory/apacheds/trunk/jdbm/src:
main/java/jdbm/btree/ main/java/jdbm/helper/ main/java/jdbm/recman/
test/java/jdbm/btree/
Author: saya
Date: Fri Sep 9 12:29:08 2011
New Revision: 1167130
URL: http://svn.apache.org/viewvc?rev=1167130&view=rev
Log:
some fixes and more tests for the recent jdbm related changes.
Added two tests to test read by multiple reader and a single writer thread. Also tested the case where reader/writer cannot find a replacable cache entry.
Added toString methods and messages to assert statements.
Based on the tests, fixed some lock related issues when no free cache entry was found.
Modified:
directory/apacheds/trunk/jdbm/src/main/java/jdbm/btree/BPage.java
directory/apacheds/trunk/jdbm/src/main/java/jdbm/btree/BTree.java
directory/apacheds/trunk/jdbm/src/main/java/jdbm/helper/ActionContext.java
directory/apacheds/trunk/jdbm/src/main/java/jdbm/helper/ActionVersioning.java
directory/apacheds/trunk/jdbm/src/main/java/jdbm/helper/ExplicitList.java
directory/apacheds/trunk/jdbm/src/main/java/jdbm/helper/LRUCache.java
directory/apacheds/trunk/jdbm/src/main/java/jdbm/recman/SnapshotRecordManager.java
directory/apacheds/trunk/jdbm/src/test/java/jdbm/btree/TestSnapshotBTree.java
Modified: directory/apacheds/trunk/jdbm/src/main/java/jdbm/btree/BPage.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/jdbm/src/main/java/jdbm/btree/BPage.java?rev=1167130&r1=1167129&r2=1167130&view=diff
==============================================================================
--- directory/apacheds/trunk/jdbm/src/main/java/jdbm/btree/BPage.java (original)
+++ directory/apacheds/trunk/jdbm/src/main/java/jdbm/btree/BPage.java Fri Sep 9 12:29:08 2011
@@ -401,7 +401,7 @@ public class BPage<K, V> implements Seri
if ( replace )
{
- pageNewCopy.values[index] = value;
+ pageNewCopy.values[index] = btree.copyValue( value );
btree.recordManager.update( recordId, pageNewCopy, this );
}
@@ -956,10 +956,10 @@ public class BPage<K, V> implements Seri
/**
* Set the entry at the given index.
*/
- private void setEntry( BPage<K, V> page, int index, K key, V value )
+ private void setEntry( BPage<K, V> page, int index, K key, V value ) throws IOException
{
page.keys[index] = key;
- page.values[index] = value;
+ page.values[index] = btree.copyValue( value );
}
Modified: directory/apacheds/trunk/jdbm/src/main/java/jdbm/btree/BTree.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/jdbm/src/main/java/jdbm/btree/BTree.java?rev=1167130&r1=1167129&r2=1167130&view=diff
==============================================================================
--- directory/apacheds/trunk/jdbm/src/main/java/jdbm/btree/BTree.java (original)
+++ directory/apacheds/trunk/jdbm/src/main/java/jdbm/btree/BTree.java Fri Sep 9 12:29:08 2011
@@ -949,6 +949,10 @@ public class BTree<K, V> implements Exte
byte[] array;
V valueCopy = null;
+
+ if ( value == null )
+ return null;
+
if ( this.valueSerializer != null )
{
array = this.valueSerializer.serialize( value );
@@ -969,7 +973,7 @@ public class BTree<K, V> implements Exte
out.flush();
byte[] arr = bout.toByteArray();
bin = new ByteArrayInputStream( arr );
- in =new ObjectInputStream( bin );
+ in = new ObjectInputStream( bin );
valueCopy = ( V )in.readObject();
}
catch ( ClassNotFoundException e )
Modified: directory/apacheds/trunk/jdbm/src/main/java/jdbm/helper/ActionContext.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/jdbm/src/main/java/jdbm/helper/ActionContext.java?rev=1167130&r1=1167129&r2=1167130&view=diff
==============================================================================
--- directory/apacheds/trunk/jdbm/src/main/java/jdbm/helper/ActionContext.java (original)
+++ directory/apacheds/trunk/jdbm/src/main/java/jdbm/helper/ActionContext.java Fri Sep 9 12:29:08 2011
@@ -45,7 +45,7 @@ public class ActionContext
public void endAction()
{
- assert( version != null );
+ assert( version != null ) : "Unexpected action state during endAction: " + this;
version = null;
}
@@ -78,4 +78,17 @@ public class ActionContext
{
return whoStarted;
}
+
+ @Override
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append( "ActionContext: " );
+ sb.append( "(readOnly: " ).append( readOnly );
+ sb.append( ", version: " ).append( version );
+ sb.append( ", whoStarted: " ).append( whoStarted );
+ sb.append( ")\n" );
+
+ return sb.toString();
+ }
}
Modified: directory/apacheds/trunk/jdbm/src/main/java/jdbm/helper/ActionVersioning.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/jdbm/src/main/java/jdbm/helper/ActionVersioning.java?rev=1167130&r1=1167129&r2=1167130&view=diff
==============================================================================
--- directory/apacheds/trunk/jdbm/src/main/java/jdbm/helper/ActionVersioning.java (original)
+++ directory/apacheds/trunk/jdbm/src/main/java/jdbm/helper/ActionVersioning.java Fri Sep 9 12:29:08 2011
@@ -153,7 +153,7 @@ public class ActionVersioning
{
long numActions = version.getNumActions().decrementAndGet();
- assert( numActions >= 0 );
+ assert( numActions >= 0 ) : "NumActions zero when read action is ended : " + version;
if ( ( numActions > 0 ) || ( version == readReference.get() ) )
{
@@ -216,5 +216,17 @@ public class ActionVersioning
{
return version;
}
+
+ @Override
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append( "Version: ");
+ sb.append( "(vesion: " ).append( version );
+ sb.append( ", numActions: " ).append( numActions );
+ sb.append( ")\n" );
+
+ return sb.toString();
+ }
}
}
\ No newline at end of file
Modified: directory/apacheds/trunk/jdbm/src/main/java/jdbm/helper/ExplicitList.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/jdbm/src/main/java/jdbm/helper/ExplicitList.java?rev=1167130&r1=1167129&r2=1167130&view=diff
==============================================================================
--- directory/apacheds/trunk/jdbm/src/main/java/jdbm/helper/ExplicitList.java (original)
+++ directory/apacheds/trunk/jdbm/src/main/java/jdbm/helper/ExplicitList.java Fri Sep 9 12:29:08 2011
@@ -31,8 +31,10 @@ package jdbm.helper;
public class ExplicitList<T>
{
- Link<T> head = new Link<T>( null );
+ private Link<T> head = new Link<T>( null );
+ private int listSize = 0;
+
public static class Link<V>
{
private V element;
@@ -73,7 +75,7 @@ public class ExplicitList<T>
public void remove()
{
- assert ( isLinked() );
+ assert( isLinked() ) : "Trying to remove from list an unlinked link";
this.getPrev().setNext( this.getNext() );
this.getNext().setPrev( this.getPrev() );
this.reset();
@@ -82,6 +84,7 @@ public class ExplicitList<T>
public void addAfter( Link<V> after )
{
+ assert( this.isUnLinked() ) : "Trying to add to list already linked link: " + this;
after.getNext().setPrev( this );
this.setNext( after.getNext() );
after.setNext( this );
@@ -91,6 +94,7 @@ public class ExplicitList<T>
public void addBefore( Link<V> before )
{
+ assert( this.isUnLinked() ) : "Trying to add to list already linked link: " + this;
before.getPrev().setNext( this );
this.setPrev( before.getPrev() );
before.setPrev( this );
@@ -98,6 +102,11 @@ public class ExplicitList<T>
}
+ /**
+ * Splices the given list by making this link as the new head.
+ *
+ * @param listHead head of the existing list
+ */
public void splice( Link<V> listHead )
{
Link<V> prevLink = listHead.getPrev();
@@ -110,7 +119,7 @@ public class ExplicitList<T>
public boolean isUnLinked()
{
- return ( prev == this && next == this );
+ return ( ( prev == this ) && ( next == this ) );
}
@@ -129,7 +138,7 @@ public class ExplicitList<T>
public void uninit()
{
- assert ( this.isUnLinked() );
+ assert ( this.isUnLinked() ) : " Unitializing a still linked entry" + this;
element = null;
}
@@ -138,23 +147,39 @@ public class ExplicitList<T>
{
return this.element;
}
+
+ @Override
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append( "Link: " ).append( this ).append( " " );
+ sb.append( "(next: " ).append( next );
+ sb.append( ",prev: " ).append( prev ).append(")");
+ sb.append( "\n" );
+
+ return sb.toString();
+ }
}
public void remove( Link<T> link )
{
+ assert( listSize > 0 ) : "Trying to remove link " + link + " from a list with no elements";
+ listSize--;
link.remove();
}
public void addFirst( Link<T> link )
{
+ listSize++;
link.addAfter( head );
}
public void addLast( Link<T> link )
{
+ listSize++;
link.addBefore( head );
}
@@ -169,5 +194,21 @@ public class ExplicitList<T>
{
return head;
}
+
+ public int size()
+ {
+ return listSize;
+ }
+
+ @Override
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append( "List: " );
+ sb.append( "(size: " ).append( listSize ).append( ")" );
+ sb.append( "\n" );
+
+ return sb.toString();
+ }
}
\ No newline at end of file
Modified: directory/apacheds/trunk/jdbm/src/main/java/jdbm/helper/LRUCache.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/jdbm/src/main/java/jdbm/helper/LRUCache.java?rev=1167130&r1=1167129&r2=1167130&view=diff
==============================================================================
--- directory/apacheds/trunk/jdbm/src/main/java/jdbm/helper/LRUCache.java (original)
+++ directory/apacheds/trunk/jdbm/src/main/java/jdbm/helper/LRUCache.java Fri Sep 9 12:29:08 2011
@@ -1,4 +1,4 @@
-/*
+ /*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -65,7 +65,7 @@ public class LRUCache<K, V>
private final int numBuckets;
/** Log of number of hash buckets each latch protects */
- private final static int LOG_BUCKET_PER_LATCH = 3;
+ private final static int LOG_BUCKET_PER_LATCH = 0;
/** Number of lrus */
private final static int NUM_LRUS = 16;
@@ -74,7 +74,7 @@ public class LRUCache<K, V>
private final static int MIN_ENTRIES = 1 << 10;
/** Max sleep time(in ms) for writes in case of cache eviction failure */
- private final static long MAX_WRITE_SLEEP_TIME = 10000;
+ private final static long MAX_WRITE_SLEEP_TIME = 600000;
/** lru list */
LRU lrus[];
@@ -94,8 +94,18 @@ public class LRUCache<K, V>
/** minimum version cache has to satisfy during reads */
private long minReadVersion;
-
-
+ /** Stats to keep track of cache gets */
+ private long cacheGets;
+
+ /** Stats to keep track of cache hits for cache gets */
+ private long cacheMisses;
+
+ /** Stats to keep track of cache puts */
+ private long cachePuts;
+
+ /** Stats to keep track of # of times writes sleep for free cache entry */
+ private long cachePutSleeps;
+
@SuppressWarnings("unchecked")
public LRUCache( EntryIO<K, V> entryIO, int cacheSize )
{
@@ -196,6 +206,8 @@ public class LRUCache<K, V>
* While reading or waiting, latch is released.
*/
+ this.cachePuts++;
+
while ( true )
{
latches[latchIndex].lock();
@@ -226,7 +238,18 @@ public class LRUCache<K, V>
if ( !entry.isCurrentVersion() )
{
- CacheEntry newEntry = this.findNewEntry( key, latchIndex );
+ assert( entry.isNeverReplace() == false ) : " Non current entry should not have neverReplace set " + entry;
+
+ entry.setNeverReplace();
+ CacheEntry newEntry = null;
+ try
+ {
+ newEntry = this.findNewEntry( key, hashIndex >> LOG_BUCKET_PER_LATCH );
+ }
+ finally
+ {
+ entry.clearNeverReplace();
+ }
/*
* Remove existing entry, chain as a snapshot
@@ -269,7 +292,7 @@ public class LRUCache<K, V>
// FALLTHROUGH
default:
- assert ( false );
+ assert ( false ): "Unknown cache entry state: " + entry ;
}
}
else
@@ -287,6 +310,7 @@ public class LRUCache<K, V>
if ( sleepForFreeEntry == false )
{
+ System.out.println(" NO cache entry for write " + totalSleepTime );
throw e;
}
}
@@ -314,6 +338,9 @@ public class LRUCache<K, V>
break;
}
}
+
+ if ( totalSleepTime != 0 )
+ this.cachePutSleeps++;
}
@@ -346,6 +373,9 @@ public class LRUCache<K, V>
*
* While reading or waiting, latch is released.
*/
+
+ this.cacheGets++;
+
latches[latchIndex].lock();
boolean chainExists = false;
@@ -376,8 +406,21 @@ public class LRUCache<K, V>
if (value != null)
break;
-
- CacheEntry newEntry = this.findNewEntry( key, latchIndex );
+
+ this.cacheMisses++;
+
+ assert( entry.isNeverReplace() == false ) : "Non Current Entry has neverReplace set to true:" + entry;
+
+ entry.setNeverReplace();
+ CacheEntry newEntry = null;
+ try
+ {
+ newEntry = this.findNewEntry( key, hashIndex >> LOG_BUCKET_PER_LATCH );
+ }
+ finally
+ {
+ entry.clearNeverReplace();
+ }
/*
* Remove existing entry, chain as a snapshot
@@ -411,16 +454,18 @@ public class LRUCache<K, V>
case ENTRY_INITIAL:
LOG.warn( "Entry with key {} is at intial while trying to read from it", entry.getKey() );
+ this.cacheMisses++;
this.doRead( entry, latches[latchIndex], serializer );
value = this.searchChainForVersion( entry, version );
break;
default:
- assert ( false );
+ assert ( false ) : "Unknown cache entry state: " + entry;
}
}
else
{
+ this.cacheMisses++;
entry = this.findNewEntry( key, latchIndex );
buckets[hashIndex].add( entry );
this.doRead( entry, latches[latchIndex], serializer );
@@ -444,6 +489,21 @@ public class LRUCache<K, V>
return value;
}
+ @Override
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append( "LRUCache: " );
+ sb.append( "(numEntries:" ).append( this.numEntries );
+ sb.append( ",maxEntries:" ).append( this.maxEntries );
+ sb.append( ",cacheGets:" ).append( this.cacheGets );
+ sb.append( ",cacheMisses:" ).append( this.cacheMisses );
+ sb.append( ",cachePuts:" ).append( this.cachePuts );
+ sb.append( ",cachePutSleeps:" ).append( this.cachePutSleeps );
+ sb.append( ")\n" );
+
+ return sb.toString();
+ }
/**
* Creates a new version of the given entry with the given new version.
@@ -461,12 +521,29 @@ public class LRUCache<K, V>
private void putNewVersion( CacheEntry entry, K key, V value, long newVersion, int hashIndex,
Lock latch, Serializer serializer, boolean neverReplace ) throws IOException, CacheEvictionException
{
+
if ( entry.getStartVersion() != newVersion )
{
- CacheEntry newEntry = this.findNewEntry( key, hashIndex >> LOG_BUCKET_PER_LATCH );
-
- // Initialize and set to new version
- newEntry.initialize( key );
+
+ boolean resetNeverReplace = true;
+ if ( entry.isNeverReplace() )
+ resetNeverReplace = false;
+
+ entry.setNeverReplace();
+ CacheEntry newEntry = null;
+ try
+ {
+ newEntry = this.findNewEntry( key, hashIndex >> LOG_BUCKET_PER_LATCH );
+ }
+ finally
+ {
+ if ( resetNeverReplace )
+ entry.clearNeverReplace();
+ }
+
+
+
+ // Set to new version
newEntry.setAsCurrentVersion( value, newVersion );
/*
@@ -482,7 +559,7 @@ public class LRUCache<K, V>
}
else
{
- assert( entry.isCurrentVersion() );
+ assert( entry.isCurrentVersion() ) : "Entry not at expected version: " + entry ;
// Entry already at current version. Just update the value
entry.setAsCurrentVersion( value, newVersion );
@@ -506,7 +583,11 @@ public class LRUCache<K, V>
* Not much we can do here, just leave the entry in an
* inconsistent state.
*/
+ latch.lock();
+
+
entry.setState( EntryState.ENTRY_INITIAL );
+ entry.clearNeverReplace();
if ( entry.anyWaiters() )
{
@@ -559,14 +640,15 @@ public class LRUCache<K, V>
if ( curEntry.getState() != EntryState.ENTRY_READY )
{
- assert( curEntry == head );
+ assert( curEntry == head ) : "Unexpected state for entry: " + curEntry;
curLink = curLink.getNext();
continue;
}
if ( curStartVersion != 0 && ( curEntry.getEndVersion() > curStartVersion ) )
{
- assert( false );
+ assert( false ) : "Unexpected version number for entry. curStartVersion: "
+ + curStartVersion + " entry: " + curEntry;
}
curStartVersion = curEntry.getStartVersion();
@@ -594,7 +676,7 @@ public class LRUCache<K, V>
if ( value == null && mustFind == true )
{
- assert( false );
+ assert( false ) : "Traversed all versions and could not find cache entry";
}
return value;
@@ -711,18 +793,17 @@ public class LRUCache<K, V>
numEntries.incrementAndGet();
CacheEntry newEntry = new CacheEntry( index );
lru = lrus[index];
+ newEntry.initialize( key );
lru.getLock().lock();
lru.addToLRU( newEntry );
lru.getLock().unlock();
- newEntry.initialize( key );
return newEntry;
}
/*
* We start with a lru determined by the lru randomizer and try to lock the lru without waiting.
- * If this doesnt work, we wait on the first lru lock. Once we get the lru, we walk over each lru
- * (this time waiting on the lock when we switch to a new lru) and try to find a victim.
+ * If this doesnt work, we wait on the first lru lock.
*/
CacheEntry victimEntry = null;
lru = null;
@@ -747,37 +828,23 @@ public class LRUCache<K, V>
lru.getLock().lock();
}
- int startingIndex = curIndex;
+ victimEntry = lru.findVictim( latchIndex );
- do
- {
- victimEntry = lru.findVictim( latchIndex );
- lru.getLock().unlock();
-
- if ( victimEntry != null )
- {
- break;
- }
-
- curIndex = (curIndex + 1) % NUM_LRUS;
- if ( curIndex == startingIndex )
- break;
-
- lru = lrus[curIndex];
- lru.getLock().lock();
- }
- while ( true );
if ( victimEntry != null )
{
victimEntry.initialize( key );
+ lru.getLock().unlock();
}
else
{
+ lru.getLock().unlock();
+
LOG.warn( "Cache eviction failure: " + this.minReadVersion );
throw new CacheEvictionException( null );
}
+
return victimEntry;
}
@@ -856,17 +923,18 @@ public class LRUCache<K, V>
{
this.key = key;
value = null;
- startVersion = endVersion = 0;
+ startVersion = 0;
+ endVersion = Long.MAX_VALUE;
stateCondition = null;
- assert ( numWaiters == 0 );
+ assert ( numWaiters == 0 ) : "Numwaiters is not zero when entry is newly initialized: " + this;
state = EntryState.ENTRY_INITIAL;
assert ( versionsLink.isUnLinked() == true );
hashIndex = hash( key ) & ( numBuckets - 1 );
- assert( neverReplace == false );
+ assert( neverReplace == false ) : "Neverreplace is true when entry is newly intialized:" + this;
}
public void setNeverReplace()
@@ -874,6 +942,16 @@ public class LRUCache<K, V>
neverReplace = true;
}
+ public void clearNeverReplace()
+ {
+ neverReplace = false;
+ }
+
+ public boolean isNeverReplace()
+ {
+ return neverReplace;
+ }
+
public K getKey()
{
@@ -918,7 +996,7 @@ public class LRUCache<K, V>
public void decrementWaiters()
{
- assert ( numWaiters > 0 );
+ assert ( numWaiters > 0 ) : "Unexpected num waiters for entry:" + this;
numWaiters--;
}
@@ -1000,10 +1078,10 @@ public class LRUCache<K, V>
public void setAsSnapshotVersion( long newEndVersion )
{
- this.endVersion = newEndVersion;
- neverReplace = false;
+ this.clearNeverReplace();
LRU lru = this.getLru();
lru.getLock().lock();
+ this.endVersion = newEndVersion;
lru.addToSnapshots( this );
lru.getLock().unlock();
}
@@ -1014,6 +1092,23 @@ public class LRUCache<K, V>
return ( this.state != EntryState.ENTRY_READING && this.numWaiters == 0 &&
this.state != EntryState.ENTRY_WRITING && !neverReplace);
}
+
+ @Override
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append( "Entry: " );
+ sb.append("(state: ").append( this.state );
+ sb.append(",numWaiters:").append( this.numWaiters );
+ sb.append(",startVersion:").append( this.startVersion );
+ sb.append(",endVersion:").append( this.endVersion );
+ sb.append(",key:").append( this.key );
+ sb.append(",value:").append( this.value ).append( ")" );
+ sb.append( "\n" );
+
+ return sb.toString();
+
+ }
}
@@ -1023,11 +1118,17 @@ public class LRUCache<K, V>
private ExplicitList<CacheEntry> mostRecentVersions = new ExplicitList<CacheEntry>();
/** List of snapshot entries */
- private LinkedList<CacheEntry> snapshotVersions = new LinkedList<CacheEntry>();
+ private ExplicitList<CacheEntry> snapshotVersions = new ExplicitList<CacheEntry>();
/** Lock protecting the list */
private Lock lock = new ReentrantLock();
+ /** Number of snaphot versions created */
+ private int numSnapshotsCreated;
+
+ /** True if lru needs to be purged of unusable snapshot versions */
+ private boolean snapshotPurgeNeeded;
+
public Lock getLock()
{
return lock;
@@ -1054,7 +1155,9 @@ public class LRUCache<K, V>
public void addToSnapshots( CacheEntry entry )
{
mostRecentVersions.remove( entry.getLruLink() );
- snapshotVersions.addLast( entry );
+ snapshotVersions.addLast( entry.getLruLink() );
+
+ numSnapshotsCreated++;
}
@@ -1102,28 +1205,40 @@ public class LRUCache<K, V>
* gotten from the tail of the lru.
*/
- Iterator<CacheEntry> it = snapshotVersions.listIterator();
+ ExplicitList.Link<CacheEntry> curLink;
+
+ curLink = snapshotVersions.begin();
- while ( it.hasNext() )
+ while ( curLink != snapshotVersions.end() )
{
- victimEntry = it.next();
+ victimEntry = curLink.getElement();
if ( victimEntry.getEndVersion() > minReadVersion )
{
break;
}
+
+ assert( victimEntry.getKey() != null ) :
+ "Snapshot victimEntry doesnt have key set:" + victimEntry ;
- assert ( victimEntry.isEntryFreeable() == true );
-
+ if ( victimEntry.isNeverReplace() )
+ {
+ curLink = curLink.getNext();
+ continue;
+ }
victimBucketIndex = victimEntry.getHashIndex();
victimLatchIndex = (victimBucketIndex >> LOG_BUCKET_PER_LATCH );
if ( ( latchIndex != victimLatchIndex ) && ( latches[victimLatchIndex].tryLock() == false ) )
{
+ curLink = curLink.getNext();
continue;
}
+ assert( victimEntry.isEntryFreeable() == true ) :
+ "Snapshot victimEntry is not freeable:" + victimEntry ;
+
int hashChainIndex = buckets[victimEntry.getHashIndex()].indexOf( victimEntry );
if ( hashChainIndex != -1 )
@@ -1149,13 +1264,13 @@ public class LRUCache<K, V>
latches[victimLatchIndex].unlock();
}
- it.remove();
- this.mostRecentVersions.addLast( victimEntry.lruLink );
+ this.snapshotVersions.remove( victimEntry.getLruLink() );
+ this.mostRecentVersions.addLast( victimEntry.getLruLink() );
return victimEntry;
}
- ExplicitList.Link<CacheEntry> curLink = mostRecentVersions.begin();
+ curLink = mostRecentVersions.begin();
while ( curLink != mostRecentVersions.end() )
{
Modified: directory/apacheds/trunk/jdbm/src/main/java/jdbm/recman/SnapshotRecordManager.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/jdbm/src/main/java/jdbm/recman/SnapshotRecordManager.java?rev=1167130&r1=1167129&r2=1167130&view=diff
==============================================================================
--- directory/apacheds/trunk/jdbm/src/main/java/jdbm/recman/SnapshotRecordManager.java (original)
+++ directory/apacheds/trunk/jdbm/src/main/java/jdbm/recman/SnapshotRecordManager.java Fri Sep 9 12:29:08 2011
@@ -625,6 +625,17 @@ public class SnapshotRecordManager imple
bigLock.unlock();
}
}
+
+ @Override
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append( "SnapshotRecordManager: " );
+ sb.append( "(lruCache:" ).append( this.versionedCache );
+ sb.append( ")\n" );
+
+ return sb.toString();
+ }
/**
Modified: directory/apacheds/trunk/jdbm/src/test/java/jdbm/btree/TestSnapshotBTree.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/jdbm/src/test/java/jdbm/btree/TestSnapshotBTree.java?rev=1167130&r1=1167129&r2=1167130&view=diff
==============================================================================
--- directory/apacheds/trunk/jdbm/src/test/java/jdbm/btree/TestSnapshotBTree.java (original)
+++ directory/apacheds/trunk/jdbm/src/test/java/jdbm/btree/TestSnapshotBTree.java Fri Sep 9 12:29:08 2011
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertTru
import java.io.IOException;
import java.io.Serializable;
+import java.util.Random;
import java.util.concurrent.Semaphore;
import jdbm.RecordManager;
@@ -47,8 +48,8 @@ import com.mycila.junit.concurrent.Concu
*
* @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
*/
-@RunWith(ConcurrentJunitRunner.class)
-@Concurrency()
+//@RunWith(ConcurrentJunitRunner.class)
+//@Concurrency()
public class TestSnapshotBTree
{
@Rule
@@ -77,8 +78,8 @@ public class TestSnapshotBTree
int idx;
int numReadThreads = 1;
- TestThread readThreads[] = new TestThread[numReadThreads];
- TestThread updateThread;
+ BasicTestThread readThreads[] = new BasicTestThread[numReadThreads];
+ BasicTestThread updateThread;
Semaphore browseSem = new Semaphore( 0 );
Semaphore updateSem = new Semaphore( 0 );
@@ -95,9 +96,9 @@ public class TestSnapshotBTree
for ( idx = 0; idx < numReadThreads; idx++ )
{
- readThreads[idx] = new TestThread( true, tree, browseSem, updateSem, numReadThreads );
+ readThreads[idx] = new BasicTestThread( true, tree, browseSem, updateSem, numReadThreads );
}
- updateThread = new TestThread( false, tree, browseSem, updateSem, numReadThreads );
+ updateThread = new BasicTestThread( false, tree, browseSem, updateSem, numReadThreads );
updateThread.start();
for ( idx = 0; idx < numReadThreads; idx++ )
@@ -115,8 +116,10 @@ public class TestSnapshotBTree
snapshotRecman.close();
}
+
- class TestThread extends Thread
+
+ class BasicTestThread extends Thread
{
boolean readOnly;
BTree<Integer, IntWrapper> btree;
@@ -124,8 +127,8 @@ public class TestSnapshotBTree
Semaphore updateSem;
int numReadThreads;
- TestThread( boolean readOnly, BTree<Integer, IntWrapper> btree, Semaphore firstBrowse,
- Semaphore updateDone, int numReadThreads)
+ BasicTestThread( boolean readOnly, BTree<Integer, IntWrapper> btree, Semaphore firstBrowse,
+ Semaphore updateDone, int numReadThreads )
{
this.readOnly = readOnly;
this.btree = btree;
@@ -242,12 +245,345 @@ public class TestSnapshotBTree
}
catch( IOException e )
{
+ e.printStackTrace();
+ assertTrue( false );
}
catch( InterruptedException e )
{
+ e.printStackTrace();
+ assertTrue( false );
+ }
+
+ }
+ } // end of class BasicTestThread
+
+
+ @Test
+ public void testLongBrowsing() throws IOException, InterruptedException
+ {
+ RecordManager recman;
+ BTree<Integer, IntWrapper> tree;
+ int numElements = 10000;
+
+ int idx;
+ int numReadThreads = 4;
+ LongBrowsingTestThread readThreads[] = new LongBrowsingTestThread[numReadThreads];
+ LongBrowsingTestThread updateThread;
+
+ recman = RecordManagerFactory.createRecordManager( getTemporaryFile( "testLongBrowsing" ) );
+ SnapshotRecordManager snapshotRecman = new SnapshotRecordManager( recman, 1 << 10 );
+
+ tree = new BTree<Integer, IntWrapper>( snapshotRecman, new IntegerComparator() );
+
+ for ( idx = 0; idx < numElements; idx++ )
+ {
+ tree.insert( new Integer( idx ), new IntWrapper( 0 ), true );
+ }
+
+ for ( idx = 0; idx < numReadThreads; idx++ )
+ {
+ readThreads[idx] = new LongBrowsingTestThread( true, tree, numElements);
+ }
+ updateThread = new LongBrowsingTestThread( false, tree, numElements );
+
+
+ readThreads[0].start();
+
+ Thread.sleep( 10 );
+
+ updateThread.start();
+ for ( idx = 1; idx < numReadThreads; idx++ )
+ {
+ Thread.sleep( 1000 );
+ readThreads[idx].start();
+ }
+
+
+ for ( idx = 0; idx < numReadThreads; idx++ )
+ {
+ readThreads[idx].join();
+ }
+ updateThread.join();
+
+ snapshotRecman.close();
+ }
+
+ class LongBrowsingTestThread extends Thread
+ {
+ boolean readOnly;
+ BTree<Integer, IntWrapper> btree;
+ int numElements;
+
+
+ LongBrowsingTestThread( boolean readOnly, BTree<Integer, IntWrapper> btree, int numElements)
+ {
+ this.readOnly = readOnly;
+ this.btree = btree;
+ this.numElements = numElements;
+ }
+
+
+
+ private void readOnlyActions() throws IOException, InterruptedException
+ {
+ int count = 0;
+ TupleBrowser<Integer, IntWrapper> browser = btree.browse();
+ Tuple<Integer, IntWrapper> tuple = new Tuple();
+
+ assert( browser.getNext( tuple ) );
+ int max = tuple.getValue().value;
+ count++;
+ System.out.println( " TestLongBrowsing read thread min key is" + tuple.getKey() + "max value is" + max );
+
+ while( browser.getNext( tuple ) )
+ {
+ count++;
+
+ // Sleep for a while to keep browsing long
+ Thread.sleep( 10 );
+
+
+ if ( tuple.getValue().value > max )
+ {
+ System.out.println(" tupe value:" + tuple.getValue().value + " Expected max:" + max + " count:" + count);
+
+ }
+
+ assertTrue( tuple.getValue().value <= max );
}
+
+
+ System.out.println( "TestLongBrowsing read thread count is " + count );
+ assertEquals( count, numElements );
+ browser.close();
+ }
+
+ private void readWriteActions()
+ {
+ int idx;
+ Random updateRandomizer = new Random();
+ try
+ {
+ for ( idx = 1; idx < 100; idx++ )
+ {
+ Integer key = new Integer( 0 );
+ IntWrapper value = btree.find( key );
+ value.value = idx;
+ btree.insert( key, value, true );
+
+ for ( int updates = 0; updates < 2048; updates++ )
+ {
+ key = new Integer( updateRandomizer.nextInt( numElements ) );
+ value = btree.find( key );
+
+ assertTrue( value.value <= idx );
+
+ value.value = idx;
+ btree.insert( key, value, true );
+ }
+ }
+
+ System.out.println( "TestLongBrowsing updates ended" );
+
+ }
+ catch( IOException e )
+ {
+ e.printStackTrace();
+ assertTrue( false );
+ }
}
- } // end of class TestThread
+
+
+ public void run()
+ {
+ try
+ {
+ if ( readOnly )
+ this.readOnlyActions();
+ else
+ this.readWriteActions();
+ }
+ catch( IOException e )
+ {
+ e.printStackTrace();
+ assertTrue( false );
+ }
+ catch( InterruptedException e )
+ {
+ e.printStackTrace();
+ assertTrue( false );
+ }
+
+ }
+ } // end of class LongBrowsingTestThread
+
+
+
+ @Test
+ public void testRemoveInsert() throws IOException, InterruptedException
+ {
+ RecordManager recman;
+ BTree<Integer, IntWrapper> tree;
+ int numElements = 10000;
+
+ int idx;
+ int numReadThreads = 4;
+ RemoveInsertTestThread readThreads[] = new RemoveInsertTestThread[numReadThreads];
+ RemoveInsertTestThread updateThread;
+
+ Semaphore browseSem = new Semaphore( 0 );
+
+ recman = RecordManagerFactory.createRecordManager( getTemporaryFile( "testRemoveInsert" ) );
+ SnapshotRecordManager snapshotRecman = new SnapshotRecordManager( recman, 1 << 12 );
+
+ tree = new BTree<Integer, IntWrapper>( snapshotRecman, new IntegerComparator() );
+
+ for ( idx = 0; idx < numElements; idx++ )
+ {
+ tree.insert( new Integer( idx ), new IntWrapper( 0 ), true );
+ }
+
+ for ( idx = 0; idx < numReadThreads; idx++ )
+ {
+ readThreads[idx] = new RemoveInsertTestThread( true, tree, numElements, browseSem, numReadThreads );
+ }
+ updateThread = new RemoveInsertTestThread( false, tree, numElements, browseSem, numReadThreads );
+
+
+ updateThread.start();
+ for ( idx = 0; idx < numReadThreads; idx++ )
+ {
+ Thread.sleep( 1000 );
+ readThreads[idx].start();
+ }
+
+
+ for ( idx = 0; idx < numReadThreads; idx++ )
+ {
+ readThreads[idx].join();
+ }
+ updateThread.join();
+
+ snapshotRecman.close();
+ }
+
+
+
+ class RemoveInsertTestThread extends Thread
+ {
+ boolean readOnly;
+ BTree<Integer, IntWrapper> btree;
+ int numElements;
+ Semaphore browseSem;
+ int numReadThreads;
+
+ RemoveInsertTestThread( boolean readOnly, BTree<Integer, IntWrapper> btree, int numElements, Semaphore browseSem, int numReadThreads )
+ {
+ this.readOnly = readOnly;
+ this.btree = btree;
+ this.numElements = numElements;
+ this.browseSem = browseSem;
+ this.numReadThreads = numReadThreads;
+ }
+
+ private void readOnlyActions() throws IOException, InterruptedException
+ {
+ int count = 0;
+ TupleBrowser<Integer, IntWrapper> browser = btree.browse();
+ Tuple<Integer, IntWrapper> tuple = new Tuple();
+
+ browseSem.release();
+
+ while( browser.getNext( tuple ) )
+ {
+ count++;
+
+ // Sleep for a while to keep browsing long
+ Thread.sleep( 10 );
+
+
+ if ( tuple.getValue().value == -1 )
+ {
+ System.out.println(" tupe key:" + tuple.getKey() + " value:" + tuple.getValue().value);
+
+ }
+
+ assertTrue( tuple.getValue().value != -1 );
+ }
+
+
+ System.out.println( "TestRemoveInsert read thread count is " + count );
+ assertEquals( count, numElements );
+ browser.close();
+ }
+
+ private void readWriteActions() throws IOException, InterruptedException
+ {
+ int idx;
+ Random updateRandomizer = new Random();
+
+ for ( idx = 0; idx < numReadThreads; idx++ )
+ browseSem.acquireUninterruptibly();
+
+
+ Integer key;
+ IntWrapper value = new IntWrapper( -1 );
+
+ for ( idx = 0; idx < 10; idx++ )
+ {
+ Thread.sleep( 10000 );
+
+ int startingIndex = updateRandomizer.nextInt( numElements );
+
+ for ( int updates = 0; updates < 32; updates++ )
+ {
+ key = new Integer( startingIndex + updates );
+
+ if ( key.intValue() >= numElements )
+ break;
+
+
+ btree.remove( key );
+ }
+
+ for ( int updates = 0; updates < 32; updates++ )
+ {
+ key = new Integer( startingIndex + updates );
+ btree.insert( key, value, true );
+ }
+ }
+
+ System.out.println( "TestRemoveInsert updates ended" );
+
+ }
+
+
+ public void run()
+ {
+ try
+ {
+ if ( readOnly )
+ this.readOnlyActions();
+ else
+ this.readWriteActions();
+ }
+ catch( IOException e )
+ {
+ e.printStackTrace();
+ assertTrue( false );
+ }
+ catch( InterruptedException e )
+ {
+ e.printStackTrace();
+ assertTrue( false );
+ }
+
+
+ }
+ } // end of class RemoveInsertTestThread
+
+
+
}
\ No newline at end of file