You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by sa...@apache.org on 2012/06/29 10:25:29 UTC
svn commit: r1355264 - in /directory/apacheds/branches/apacheds-txns:
core-api/src/main/java/org/apache/directory/server/core/api/
core-api/src/main/java/org/apache/directory/server/core/api/filtering/
core-api/src/main/java/org/apache/directory/server...
Author: saya
Date: Fri Jun 29 08:25:25 2012
New Revision: 1355264
URL: http://svn.apache.org/viewvc?rev=1355264&view=rev
Log:
Leaked cursor management. Entry filtering cursors which are open for more than a minute are redirected to a RandomFileCursor after closing the wrapped cursors.
Added:
directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/txn/LeakedCursorManager.java
directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/DefaultLeakedCursorManager.java
directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/RandomFileCursor.java
Modified:
directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/DirectoryService.java
directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/filtering/AbstractEntryFilteringCursor.java
directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/filtering/BaseEntryFilteringCursor.java
directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/filtering/CursorList.java
directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/filtering/EntryFilteringCursor.java
directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/log/LogAnchor.java
directory/apacheds/branches/apacheds-txns/core-api/src/test/java/org/apache/directory/server/core/api/MockDirectoryService.java
directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/log/LogManager.java
directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/DefaultTxnManager.java
directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/TxnManagerFactory.java
directory/apacheds/branches/apacheds-txns/core-shared/src/test/java/org/apache/directory/server/core/shared/log/UserLogRecordTest.java
directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/DefaultDirectoryService.java
directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/DefaultOperationManager.java
directory/apacheds/branches/apacheds-txns/server-integ/src/test/java/org/apache/directory/server/operations/search/PagedSearchIT.java
Modified: directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/DirectoryService.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/DirectoryService.java?rev=1355264&r1=1355263&r2=1355264&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/DirectoryService.java (original)
+++ directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/DirectoryService.java Fri Jun 29 08:25:25 2012
@@ -39,6 +39,7 @@ import org.apache.directory.server.core.
import org.apache.directory.server.core.api.schema.SchemaPartition;
import org.apache.directory.server.core.api.subtree.SubentryCache;
import org.apache.directory.server.core.api.subtree.SubtreeEvaluator;
+import org.apache.directory.server.core.api.txn.LeakedCursorManager;
import org.apache.directory.server.core.api.txn.TxnLogManager;
import org.apache.directory.server.core.api.txn.TxnManager;
import org.apache.directory.shared.ldap.codec.api.LdapApiService;
@@ -626,6 +627,7 @@ public interface DirectoryService extend
TxnManager getTxnManager();
+ LeakedCursorManager getLeakedCursorManager();
TxnLogManager getTxnLogManager();
Modified: directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/filtering/AbstractEntryFilteringCursor.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/filtering/AbstractEntryFilteringCursor.java?rev=1355264&r1=1355263&r2=1355264&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/filtering/AbstractEntryFilteringCursor.java (original)
+++ directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/filtering/AbstractEntryFilteringCursor.java Fri Jun 29 08:25:25 2012
@@ -20,6 +20,7 @@ package org.apache.directory.server.core
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.directory.server.core.api.interceptor.context.SearchingOperationContext;
import org.apache.directory.server.core.api.txn.TxnHandle;
@@ -46,10 +47,19 @@ public abstract class AbstractEntryFilte
/** The associated transaction */
protected TxnHandle transaction;
-
- /** True if a thread is using the txn */
- protected AtomicBoolean txnBusy = new AtomicBoolean( false );
-
+
+ /**
+ * Entry filtering cursor lock..any access to the cursor is through this lock
+ * The lock is reentrant as same thread may lock it several times without unlocking.
+ */
+ protected ReentrantLock lock = new ReentrantLock();
+
+ /** flag to detect the closed cursor */
+ protected boolean closed;
+
+ /** creation timestamp of the cursor */
+ protected long timestamp;
+
/**
* An instance for this class
@@ -112,11 +122,16 @@ public abstract class AbstractEntryFilte
LOG.info( "Cursor has been abandoned." );
}
}
+
-
- protected boolean maybeSetCurTxn() throws Exception
+ /**
+ * {@inheritDoc}
+ */
+ public void pinCursor()
{
- if ( transaction != null )
+ lock.lock();
+
+ if ( transaction != null )
{
TxnHandle curTxn = txnManager.getCurTxn();
@@ -129,22 +144,47 @@ public abstract class AbstractEntryFilte
}
else
{
- boolean busy = !txnBusy.compareAndSet( false, true );
-
- // This can happen if the abondon listener sneaked in and
- // closed the cursor. return immediately
- if ( busy )
- {
- throw new OperationAbandonedException();
- }
-
txnManager.setCurTxn( transaction );
-
- return true;
}
}
-
- return false;
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public void unpinCursor()
+ {
+ boolean checkForTxnUnset = false;
+
+ if ( txnManager != null )
+ {
+ checkForTxnUnset = ( txnManager.getCurTxn() == transaction );
+ }
+ lock.unlock();
+
+ if ( checkForTxnUnset && !lock.isHeldByCurrentThread() )
+ {
+ txnManager.setCurTxn( null );
+ }
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public void setTimestamp( long timestamp )
+ {
+ this.timestamp = timestamp;
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public long getTimestamp()
+ {
+ return timestamp;
}
@@ -154,71 +194,28 @@ public abstract class AbstractEntryFilte
{
return;
}
+
+ if ( transaction.getState() == TxnHandle.State.COMMIT ||
+ transaction.getState() == TxnHandle.State.ABORT )
+ {
+ return;
+ }
// If this thread already owns the txn, then end it and return
TxnHandle curTxn = txnManager.getCurTxn();
- if ( curTxn != null )
+ if ( curTxn != transaction )
{
- if ( curTxn != transaction )
- {
- throw new IllegalStateException( "Shouldnt Have another txn running if cursor has a txn " );
- }
-
- if ( abort == false )
- {
- txnManager.commitTransaction();
- }
- else
- {
- txnManager.abortTransaction();
- }
+ throw new IllegalStateException( "Shouldnt Have another txn running if cursor has a txn " );
+ }
- txnBusy.set( false );
- txnManager.setCurTxn( null );
+ if ( abort == false )
+ {
+ txnManager.commitTransaction();
}
else
{
- while ( !( transaction.getState() == TxnHandle.State.COMMIT || transaction.getState() == TxnHandle.State.ABORT ) )
- {
- boolean busy = !txnBusy.compareAndSet( false, true );
-
- // This can happen if the abondon listener sneaked in and
- // closed the cursor. return immediately
- if ( busy )
- {
- try
- {
- Thread.sleep( 100 );
- }
- catch ( Exception e )
- {
- //ignore
- }
- continue;
- }
-
- if ( transaction.getState() == TxnHandle.State.COMMIT ||
- transaction.getState() == TxnHandle.State.ABORT )
- {
- txnBusy.set( false );
- break;
- }
-
- txnManager.setCurTxn( transaction );
-
- if ( abort == false )
- {
- txnManager.commitTransaction();
- }
- else
- {
- txnManager.abortTransaction();
- }
-
- txnBusy.set( false );
- txnManager.setCurTxn( null );
- }
+ txnManager.abortTransaction();
}
}
}
Modified: directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/filtering/BaseEntryFilteringCursor.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/filtering/BaseEntryFilteringCursor.java?rev=1355264&r1=1355263&r2=1355264&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/filtering/BaseEntryFilteringCursor.java (original)
+++ directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/filtering/BaseEntryFilteringCursor.java Fri Jun 29 08:25:25 2012
@@ -28,6 +28,7 @@ import java.util.List;
import org.apache.directory.server.core.api.entry.ClonedServerEntry;
import org.apache.directory.server.core.api.entry.ClonedServerEntrySearch;
import org.apache.directory.server.core.api.interceptor.context.SearchingOperationContext;
+import org.apache.directory.server.core.api.txn.LeakedCursorManager;
import org.apache.directory.server.core.api.txn.TxnHandle;
import org.apache.directory.shared.i18n.I18n;
import org.apache.directory.shared.ldap.model.cursor.ClosureMonitor;
@@ -58,7 +59,7 @@ public class BaseEntryFilteringCursor ex
private static final Logger LOG = LoggerFactory.getLogger( BaseEntryFilteringCursor.class );
/** the underlying wrapped search results Cursor */
- private final Cursor<Entry> wrapped;
+ private Cursor<Entry> wrapped;
/** the list of filters to be applied */
private final List<EntryFilter> filters;
@@ -169,9 +170,9 @@ public class BaseEntryFilteringCursor ex
* {@inheritDoc}
*/
public void afterLast() throws Exception
- {
- boolean setCurTxn = maybeSetCurTxn();
-
+ {
+ pinCursor();
+
try
{
wrapped.afterLast();
@@ -179,11 +180,7 @@ public class BaseEntryFilteringCursor ex
}
finally
{
- if ( setCurTxn )
- {
- txnBusy.set( false );
- txnManager.setCurTxn( null );
- }
+ unpinCursor();
}
}
@@ -193,7 +190,15 @@ public class BaseEntryFilteringCursor ex
*/
public boolean available()
{
- return prefetched != null;
+ boolean result;
+
+ pinCursor();
+
+ result = ( prefetched != null );
+
+ unpinCursor();
+
+ return result;
}
@@ -211,8 +216,8 @@ public class BaseEntryFilteringCursor ex
*/
public void beforeFirst() throws Exception
{
- boolean setCurTxn = maybeSetCurTxn();
-
+ pinCursor();
+
try
{
wrapped.beforeFirst();
@@ -220,11 +225,7 @@ public class BaseEntryFilteringCursor ex
}
finally
{
- if ( setCurTxn )
- {
- txnBusy.set( false );
- txnManager.setCurTxn( null );
- }
+ unpinCursor();
}
}
@@ -233,15 +234,20 @@ public class BaseEntryFilteringCursor ex
* {@inheritDoc}
*/
public void close() throws Exception
- {
+ {
+ pinCursor();
+ closed = true;
+
try
- {
+ {
+
wrapped.close();
prefetched = null;
}
finally
{
endTxnAtClose( false );
+ unpinCursor();
}
}
@@ -251,14 +257,18 @@ public class BaseEntryFilteringCursor ex
*/
public void close( Exception reason ) throws Exception
{
+ pinCursor();
+ closed = true;
+
try
- {
+ {
wrapped.close( reason );
prefetched = null;
}
finally
{
endTxnAtClose( true );
+ unpinCursor();
}
}
@@ -283,9 +293,9 @@ public class BaseEntryFilteringCursor ex
close();
throw new OperationAbandonedException();
}
-
- boolean setCurTxn = maybeSetCurTxn();
-
+
+ pinCursor();
+
try
{
beforeFirst();
@@ -293,11 +303,7 @@ public class BaseEntryFilteringCursor ex
}
finally
{
- if ( setCurTxn )
- {
- txnBusy.set( false );
- txnManager.setCurTxn( null );
- }
+ unpinCursor();
}
}
@@ -307,11 +313,20 @@ public class BaseEntryFilteringCursor ex
*/
public Entry get() throws Exception
{
+ Entry result;
+
+ pinCursor();
+
if ( available() )
{
- return prefetched;
+ result = prefetched;
+ unpinCursor();
+
+ return result;
}
+ unpinCursor();
+
throw new InvalidCursorPositionException();
}
@@ -321,7 +336,7 @@ public class BaseEntryFilteringCursor ex
*/
public boolean isClosed() throws Exception
{
- return wrapped.isClosed();
+ return closed;
}
@@ -336,9 +351,9 @@ public class BaseEntryFilteringCursor ex
close();
throw new OperationAbandonedException();
}
-
- boolean setCurTxn = maybeSetCurTxn();
-
+
+ pinCursor();
+
try
{
afterLast();
@@ -346,11 +361,7 @@ public class BaseEntryFilteringCursor ex
}
finally
{
- if ( setCurTxn )
- {
- txnBusy.set( false );
- txnManager.setCurTxn( null );
- }
+ unpinCursor();
}
}
@@ -486,11 +497,11 @@ public class BaseEntryFilteringCursor ex
close();
throw new OperationAbandonedException();
}
-
- boolean setCurTxn = maybeSetCurTxn();
-
+
+ pinCursor();
+
try
- {
+ {
Entry tempResult = null;
outer: while ( wrapped.next() )
@@ -548,17 +559,13 @@ public class BaseEntryFilteringCursor ex
return true;
}
-
+
prefetched = null;
return false;
}
finally
{
- if ( setCurTxn )
- {
- txnBusy.set( false );
- txnManager.setCurTxn( null );
- }
+ unpinCursor();
}
}
@@ -574,18 +581,18 @@ public class BaseEntryFilteringCursor ex
close();
throw new OperationAbandonedException();
}
-
- boolean setCurTxn = maybeSetCurTxn();
-
+
+ pinCursor();
+
try
{
Entry tempResult = null;
-
+
outer: while ( wrapped.previous() )
{
boolean accepted = true;
tempResult = new ClonedServerEntrySearch( wrapped.get() );
-
+
/*
* O P T I M I Z A T I O N
* -----------------------
@@ -593,23 +600,23 @@ public class BaseEntryFilteringCursor ex
* Don't want to waste cycles on enabling a loop for processing
* filters if we have zero or one filter.
*/
-
+
if ( filters.isEmpty() )
{
prefetched = tempResult;
filterContents( prefetched );
return true;
}
-
+
if ( ( filters.size() == 1 ) && filters.get( 0 ).accept( getOperationContext(), tempResult ) )
{
prefetched = tempResult;
filterContents( prefetched );
return true;
}
-
+
/* E N D O P T I M I Z A T I O N */
-
+
for ( EntryFilter filter : filters )
{
// if a filter rejects then short and continue with outer loop
@@ -618,7 +625,7 @@ public class BaseEntryFilteringCursor ex
continue outer;
}
}
-
+
/*
* Here the entry has been accepted by all filters.
*/
@@ -626,19 +633,50 @@ public class BaseEntryFilteringCursor ex
filterContents( prefetched );
return true;
}
-
+
prefetched = null;
-
- return false;
+
+ return false;
}
finally
{
- if ( setCurTxn )
+ unpinCursor();
+ }
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public void doLeakedCursorManagement( LeakedCursorManager leakedCursorManager ) throws Exception
+ {
+ Cursor<Entry> oldCursor;
+ boolean doNext = false;
+
+ pinCursor();
+
+ if ( previous() )
+ {
+ doNext = true;
+ }
+
+ oldCursor = wrapped;
+
+ try
+ {
+ wrapped = leakedCursorManager.createLeakedCursor( this );
+ }
+ finally
+ {
+ if ( doNext )
{
- txnBusy.set( false );
- txnManager.setCurTxn( null );
+ next();
}
}
+
+ oldCursor.close();
+
+ unpinCursor();
}
@@ -688,7 +726,7 @@ public class BaseEntryFilteringCursor ex
{
throw new UnsupportedOperationException( I18n.err( I18n.ERR_02014_UNSUPPORTED_OPERATION, getClass().getName()
.concat( "." ).concat( "isLast()" ) ) );
- }
+ }
}
Modified: directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/filtering/CursorList.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/filtering/CursorList.java?rev=1355264&r1=1355263&r2=1355264&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/filtering/CursorList.java (original)
+++ directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/filtering/CursorList.java Fri Jun 29 08:25:25 2012
@@ -25,6 +25,7 @@ import java.util.Iterator;
import java.util.List;
import org.apache.directory.server.core.api.interceptor.context.SearchingOperationContext;
+import org.apache.directory.server.core.api.txn.LeakedCursorManager;
import org.apache.directory.shared.i18n.I18n;
import org.apache.directory.shared.ldap.model.cursor.ClosureMonitor;
import org.apache.directory.shared.ldap.model.cursor.Cursor;
@@ -47,20 +48,17 @@ import org.slf4j.LoggerFactory;
public class CursorList extends AbstractEntryFilteringCursor
{
/** The inner List */
- private final List<EntryFilteringCursor> list;
+ private List<EntryFilteringCursor> list;
/** The starting position for the cursor in the list. It can be > 0 */
- private final int start;
+ private int start;
/** The ending position for the cursor in the list. It can be < List.size() */
- private final int end;
+ private int end;
/** The current position in the list */
private int index = -1;
- /** flag to detect the closed cursor */
- private boolean closed;
-
private static final Logger LOG = LoggerFactory.getLogger( CursorList.class );
@@ -128,12 +126,21 @@ public class CursorList extends Abstract
*/
public boolean available()
{
- if ( ( index >= 0 ) && ( index < end ) )
+ pinCursor();
+
+ try
{
- return list.get( index ).available();
- }
+ if ( ( index >= 0 ) && ( index < end ) )
+ {
+ return list.get( index ).available();
+ }
- return false;
+ return false;
+ }
+ finally
+ {
+ unpinCursor();
+ }
}
@@ -162,8 +169,8 @@ public class CursorList extends Abstract
*/
public void beforeFirst() throws Exception
{
- boolean setCurTxn = maybeSetCurTxn();
-
+ pinCursor();
+
try
{
this.index = 0;
@@ -171,11 +178,7 @@ public class CursorList extends Abstract
}
finally
{
- if ( setCurTxn )
- {
- txnBusy.set( false );
- txnManager.setCurTxn( null );
- }
+ unpinCursor();
}
}
@@ -185,8 +188,8 @@ public class CursorList extends Abstract
*/
public void afterLast() throws Exception
{
- boolean setCurTxn = maybeSetCurTxn();
-
+ pinCursor();
+
try
{
this.index = end - 1;
@@ -194,14 +197,9 @@ public class CursorList extends Abstract
}
finally
{
- if ( setCurTxn )
- {
- txnBusy.set( false );
- txnManager.setCurTxn( null );
- }
+ unpinCursor();
}
-
}
@@ -210,8 +208,8 @@ public class CursorList extends Abstract
*/
public boolean first() throws Exception
{
- boolean setCurTxn = maybeSetCurTxn();
-
+ pinCursor();
+
try
{
if ( list.size() > 0 )
@@ -219,16 +217,12 @@ public class CursorList extends Abstract
index = start;
return list.get( index ).first();
}
-
+
return false;
}
finally
{
- if ( setCurTxn )
- {
- txnBusy.set( false );
- txnManager.setCurTxn( null );
- }
+ unpinCursor();
}
}
@@ -238,8 +232,8 @@ public class CursorList extends Abstract
*/
public boolean last() throws Exception
{
- boolean setCurTxn = maybeSetCurTxn();
-
+ pinCursor();
+
try
{
if ( list.size() > 0 )
@@ -247,16 +241,12 @@ public class CursorList extends Abstract
index = end - 1;
return list.get( index ).last();
}
-
+
return false;
}
finally
{
- if ( setCurTxn )
- {
- txnBusy.set( false );
- txnManager.setCurTxn( null );
- }
+ unpinCursor();
}
}
@@ -266,21 +256,17 @@ public class CursorList extends Abstract
*/
public boolean isFirst() throws Exception
{
- boolean setCurTxn = maybeSetCurTxn();
-
+ pinCursor();
+
try
{
return ( list.size() > 0 ) && ( index == start ) && list.get( index ).first();
}
finally
{
- if ( setCurTxn )
- {
- txnBusy.set( false );
- txnManager.setCurTxn( null );
- }
+ unpinCursor();
}
-
+
}
@@ -289,19 +275,15 @@ public class CursorList extends Abstract
*/
public boolean isLast() throws Exception
{
- boolean setCurTxn = maybeSetCurTxn();
-
+ pinCursor();
+
try
{
return ( list.size() > 0 ) && ( index == end - 1 ) && list.get( index ).last();
}
finally
{
- if ( setCurTxn )
- {
- txnBusy.set( false );
- txnManager.setCurTxn( null );
- }
+ unpinCursor();
}
}
@@ -329,8 +311,8 @@ public class CursorList extends Abstract
*/
public boolean previous() throws Exception
{
- boolean setCurTxn = maybeSetCurTxn();
-
+ pinCursor();
+
try
{
// if parked at -1 we cannot go backwards
@@ -338,7 +320,7 @@ public class CursorList extends Abstract
{
return false;
}
-
+
// if the index moved back is still greater than or eq to start then OK
if ( index - 1 >= start )
{
@@ -346,7 +328,7 @@ public class CursorList extends Abstract
{
index--;
}
-
+
if ( !list.get( index ).previous() )
{
index--;
@@ -364,7 +346,7 @@ public class CursorList extends Abstract
return true;
}
}
-
+
// if the index currently less than or equal to start we need to park it at -1 and return false
if ( index <= start )
{
@@ -378,22 +360,18 @@ public class CursorList extends Abstract
return true;
}
}
-
+
if ( list.size() <= 0 )
{
index = -1;
}
-
+
return false;
-
+
}
finally
{
- if ( setCurTxn )
- {
- txnBusy.set( false );
- txnManager.setCurTxn( null );
- }
+ unpinCursor();
}
}
@@ -403,8 +381,8 @@ public class CursorList extends Abstract
*/
public boolean next() throws Exception
{
- boolean setCurTxn = maybeSetCurTxn();
-
+ pinCursor();
+
try
{
// if parked at -1 we advance to the start index and return true
@@ -413,7 +391,7 @@ public class CursorList extends Abstract
index = start;
return list.get( index ).next();
}
-
+
// if the index plus one is less than the end then increment and return true
if ( list.size() > 0 && index + 1 < end )
{
@@ -434,7 +412,7 @@ public class CursorList extends Abstract
return true;
}
}
-
+
// if the index plus one is equal to the end then increment and return false
if ( list.size() > 0 && index + 1 == end )
{
@@ -448,21 +426,17 @@ public class CursorList extends Abstract
return true;
}
}
-
+
if ( list.size() <= 0 )
{
index = end;
}
-
+
return false;
}
finally
{
- if ( setCurTxn )
- {
- txnBusy.set( false );
- txnManager.setCurTxn( null );
- }
+ unpinCursor();
}
}
@@ -472,14 +446,25 @@ public class CursorList extends Abstract
*/
public Entry get() throws Exception
{
- if ( index < start || index >= end )
+ pinCursor();
+
+ try
{
- throw new IOException( I18n.err( I18n.ERR_02009_CURSOR_NOT_POSITIONED ) );
- }
+ if ( index < start || index >= end )
+ {
+ throw new IOException(
+ I18n.err( I18n.ERR_02009_CURSOR_NOT_POSITIONED ) );
+ }
+
+ if ( list.get( index ).available() )
+ {
+ return list.get( index ).get();
+ }
- if ( list.get( index ).available() )
+ }
+ finally
{
- return list.get( index ).get();
+ unpinCursor();
}
throw new InvalidCursorPositionException();
@@ -536,34 +521,42 @@ public class CursorList extends Abstract
*/
public void close( Exception reason ) throws Exception
{
+ pinCursor();
closed = true;
- for ( Cursor<?> c : list )
+ try
{
- try
+ for ( Cursor<?> c : list )
{
- if ( reason != null )
+ try
{
- c.close( reason );
+ if ( reason != null )
+ {
+ c.close( reason );
+ }
+ else
+ {
+ c.close();
+ }
}
- else
+ catch ( Exception e )
{
- c.close();
+ LOG.warn( "Failed to close the cursor" );
}
}
- catch ( Exception e )
+ }
+ finally
+ {
+ if ( reason == null )
{
- LOG.warn( "Failed to close the cursor" );
+ this.endTxnAtClose( false );
+ }
+ else
+ {
+ this.endTxnAtClose( true );
}
- }
- if ( reason == null )
- {
- this.endTxnAtClose( false );
- }
- else
- {
- this.endTxnAtClose( true );
+ unpinCursor();
}
}
@@ -580,6 +573,22 @@ public class CursorList extends Abstract
/**
* {@inheritDoc}
*/
+ public void doLeakedCursorManagement( LeakedCursorManager leakedCursorManager ) throws Exception
+ {
+ pinCursor();
+
+ for ( EntryFilteringCursor cursor : list )
+ {
+ cursor.doLeakedCursorManagement( leakedCursorManager );
+ }
+
+ unpinCursor();
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
public Iterator<Entry> iterator()
{
throw new UnsupportedOperationException();
Modified: directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/filtering/EntryFilteringCursor.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/filtering/EntryFilteringCursor.java?rev=1355264&r1=1355263&r2=1355264&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/filtering/EntryFilteringCursor.java (original)
+++ directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/filtering/EntryFilteringCursor.java Fri Jun 29 08:25:25 2012
@@ -23,6 +23,7 @@ package org.apache.directory.server.core
import java.util.List;
import org.apache.directory.server.core.api.interceptor.context.SearchingOperationContext;
+import org.apache.directory.server.core.api.txn.LeakedCursorManager;
import org.apache.directory.server.core.api.txn.TxnHandle;
import org.apache.directory.server.core.api.txn.TxnManager;
import org.apache.directory.shared.ldap.model.cursor.Cursor;
@@ -98,4 +99,36 @@ public interface EntryFilteringCursor ex
* @return the associated transaction to this cursor
*/
TxnHandle getTransaction();
+
+
+ /**
+ * Ensure exclusive access to the cursor state. A thread might
+ * call this method multiple times without unpinning the cursor,
+ * so it should be reentrant.
+ */
+ void pinCursor();
+
+
+ /**
+ * Release pin gotten by pinCursor
+ */
+ void unpinCursor();
+
+
+ /**
+ * do the leak cursor management
+ */
+ void doLeakedCursorManagement( LeakedCursorManager leakedCursorManager ) throws Exception;
+
+ /**
+ * Set creation timestamp
+ * @param timestamp creation timestamp
+ */
+ void setTimestamp( long timestamp );
+
+ /**
+ *
+ * @return the creation timestamp
+ */
+ long getTimestamp();
}
\ No newline at end of file
Modified: directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/log/LogAnchor.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/log/LogAnchor.java?rev=1355264&r1=1355263&r2=1355264&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/log/LogAnchor.java (original)
+++ directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/log/LogAnchor.java Fri Jun 29 08:25:25 2012
@@ -142,6 +142,6 @@ public class LogAnchor implements Extern
*/
public String toString()
{
- return "File number: " + logFileNumber + ", offset: " + logFileOffset + ", LSN: " + Long.toHexString( logLSN );
+ return "File number: " + logFileNumber + ", offset: " + logFileOffset + ", LSN: " + logLSN;
}
}
Added: directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/txn/LeakedCursorManager.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/txn/LeakedCursorManager.java?rev=1355264&view=auto
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/txn/LeakedCursorManager.java (added)
+++ directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/txn/LeakedCursorManager.java Fri Jun 29 08:25:25 2012
@@ -0,0 +1,14 @@
+package org.apache.directory.server.core.api.txn;
+
+import org.apache.directory.server.core.api.filtering.EntryFilteringCursor;
+import org.apache.directory.shared.ldap.model.cursor.Cursor;
+import org.apache.directory.shared.ldap.model.entry.Entry;
+
+
+public interface LeakedCursorManager
+{
+ Cursor<Entry> createLeakedCursor( EntryFilteringCursor cursor ) throws Exception;
+
+
+ void trackCursor( EntryFilteringCursor cursor );
+}
Modified: directory/apacheds/branches/apacheds-txns/core-api/src/test/java/org/apache/directory/server/core/api/MockDirectoryService.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core-api/src/test/java/org/apache/directory/server/core/api/MockDirectoryService.java?rev=1355264&r1=1355263&r2=1355264&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core-api/src/test/java/org/apache/directory/server/core/api/MockDirectoryService.java (original)
+++ directory/apacheds/branches/apacheds-txns/core-api/src/test/java/org/apache/directory/server/core/api/MockDirectoryService.java Fri Jun 29 08:25:25 2012
@@ -42,6 +42,7 @@ import org.apache.directory.server.core.
import org.apache.directory.server.core.api.schema.SchemaPartition;
import org.apache.directory.server.core.api.subtree.SubentryCache;
import org.apache.directory.server.core.api.subtree.SubtreeEvaluator;
+import org.apache.directory.server.core.api.txn.LeakedCursorManager;
import org.apache.directory.server.core.api.txn.TxnLogManager;
import org.apache.directory.server.core.api.txn.TxnManager;
import org.apache.directory.shared.ldap.codec.api.LdapApiService;
@@ -617,6 +618,11 @@ public class MockDirectoryService implem
return null;
}
+ public LeakedCursorManager getLeakedCursorManager()
+ {
+ return null;
+ }
+
public TxnLogManager getTxnLogManager()
{
Modified: directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/log/LogManager.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/log/LogManager.java?rev=1355264&r1=1355263&r2=1355264&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/log/LogManager.java (original)
+++ directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/log/LogManager.java Fri Jun 29 08:25:25 2012
@@ -168,7 +168,6 @@ import org.apache.directory.server.i18n.
}
initialLsn = logRecord.getLogAnchor().getLogLSN();
- System.out.println(" Log manager inital lsn " + initialLsn);
long lastGoodLogFileNumber = scanner.getLastGoodFileNumber();
long lastGoodLogFileOffset = scanner.getLastGoodOffset();
Added: directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/DefaultLeakedCursorManager.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/DefaultLeakedCursorManager.java?rev=1355264&view=auto
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/DefaultLeakedCursorManager.java (added)
+++ directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/DefaultLeakedCursorManager.java Fri Jun 29 08:25:25 2012
@@ -0,0 +1,224 @@
+package org.apache.directory.server.core.shared.txn;
+
+
+import java.io.File;
+import java.io.RandomAccessFile;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.directory.server.core.api.filtering.EntryFilteringCursor;
+import org.apache.directory.server.core.api.txn.LeakedCursorManager;
+import org.apache.directory.server.core.shared.txn.DefaultTxnManager.LogSyncer;
+import org.apache.directory.shared.ldap.model.cursor.Cursor;
+import org.apache.directory.shared.ldap.model.entry.Entry;
+
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+public class DefaultLeakedCursorManager implements LeakedCursorManager
+{
+ /** List of tracked cursors */
+ private ConcurrentLinkedQueue<EntryFilteringCursor> cursors = new ConcurrentLinkedQueue<EntryFilteringCursor>();
+
+ private static int LEAK_CHECK_INTERVAL = 1000;
+
+ private static int LEAK_TIMEOUT = 1000;
+
+ private static final String CURSOR_SUFFIX = "cursor";
+
+ private String cursorFolderPath;
+
+ private AtomicInteger idx = new AtomicInteger( 0 );
+
+ CursorChecker checker;
+
+
+ public DefaultLeakedCursorManager( String folderPath )
+ {
+ cursorFolderPath = folderPath;
+
+ File folder = new File( cursorFolderPath );
+ folder.mkdirs();
+ }
+
+
+ public void init()
+ {
+ if ( checker == null )
+ {
+ checker = new CursorChecker();
+ checker.setDaemon( true );
+ checker.start();
+ }
+ }
+
+
+ public void shutdown()
+ {
+ checker.interrupt();
+
+ try
+ {
+ checker.join();
+ }
+ catch ( InterruptedException e )
+ {
+ //Ignore
+ }
+
+ cursors.clear();
+ }
+
+
+ public Cursor<Entry> createLeakedCursor( EntryFilteringCursor cursor ) throws Exception
+ {
+ File cursorFile = makeCursorFileName();
+
+ cursorFile.createNewFile();
+
+ RandomAccessFile raf = new RandomAccessFile( cursorFile, "rw" );
+
+ try
+ {
+ raf.setLength( 0 );
+ raf.getFD().sync();
+ }
+ finally
+ {
+ raf.close();
+ }
+
+ boolean canMoveBeforeFirst = false;
+
+ if ( cursor.previous() )
+ {
+ cursor.next();
+ }
+ else
+ {
+ canMoveBeforeFirst = true;
+ }
+
+
+ return new RandomFileCursor( cursorFile, cursor, canMoveBeforeFirst,
+ cursor.getOperationContext().getSession().getDirectoryService().getSchemaManager() );
+ }
+
+
+ public void trackCursor( EntryFilteringCursor cursor )
+ {
+ cursors.add( cursor );
+ }
+
+
+ private File makeCursorFileName()
+ {
+ int fileIdx = idx.incrementAndGet();
+
+ return new File( cursorFolderPath + File.separatorChar + fileIdx + "."
+ + CURSOR_SUFFIX );
+ }
+
+
+ private void checkLeakedCursors() throws Exception
+ {
+ Iterator<EntryFilteringCursor> it = cursors.iterator();
+ EntryFilteringCursor cursor;
+ long currentTimestamp = System.currentTimeMillis();
+
+ while ( it.hasNext() )
+ {
+ cursor = it.next();
+
+ if ( cursor.isClosed() )
+ {
+ it.remove();
+ continue;
+ }
+
+ if ( ( currentTimestamp - cursor.getTimestamp() ) >= LEAK_TIMEOUT )
+ {
+ cursor.pinCursor();
+
+ if ( cursor.isClosed() )
+ {
+ cursor.unpinCursor();
+ it.remove();
+ continue;
+ }
+
+ System.out.println("Doing leaked cursor management2" + cursor);
+
+ try
+ {
+ cursor.doLeakedCursorManagement( this );
+ }
+ finally
+ {
+ cursor.unpinCursor();
+ }
+
+ it.remove();
+ continue;
+
+ }
+ else
+ {
+ // Maybe break here
+ }
+
+ }
+ }
+
+ class CursorChecker extends Thread
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ while ( true )
+ {
+ Thread.sleep( LEAK_CHECK_INTERVAL );
+
+ try
+ {
+ checkLeakedCursors();
+ }
+ catch ( Exception e )
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+ catch ( InterruptedException e )
+ {
+ // Bail out
+ }
+ catch ( Exception e )
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+}
Modified: directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/DefaultTxnManager.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/DefaultTxnManager.java?rev=1355264&r1=1355263&r2=1355264&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/DefaultTxnManager.java (original)
+++ directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/DefaultTxnManager.java Fri Jun 29 08:25:25 2012
@@ -171,6 +171,8 @@ class DefaultTxnManager implements TxnMa
lastFlushedLogAnchor = new LogAnchor();
initialScanPoint = wal.getCheckPoint();
+ //System.out.println("checkpoint " + initialScanPoint);
+
lastFlushedLogAnchor.resetLogAnchor( initialScanPoint );
dummyTxn.commitTxn( initialScanPoint.getLogLSN() );
@@ -191,7 +193,7 @@ class DefaultTxnManager implements TxnMa
public void shutdown()
{
- System.out.println("in shutdown");
+ //System.out.println("in shutdown");
syncer.interrupt();
try
@@ -212,11 +214,13 @@ class DefaultTxnManager implements TxnMa
ReadWriteTxn latestCommitted = latestCommittedTxn.get();
long latestFlushedLsn = latestFlushedTxnLSN.get();
- System.out.println("latest committed txn " + latestCommitted.getCommitTime() +
- " latest flushed " + latestFlushedLsn);
- //flushTxns();
+ flushTxns();
+
+ advanceCheckPoint( lastFlushedLogAnchor );
- //advanceCheckPoint( lastFlushedLogAnchor );
+ // System.out.println("latest committed txn " + latestCommitted.getCommitTime() +
+ // " latest flushed " + latestFlushedLsn + " last flushed log anchor " +
+ // lastFlushedLogAnchor );
}
catch ( Exception e )
{
@@ -1004,7 +1008,7 @@ class DefaultTxnManager implements TxnMa
UserLogRecord logRecord = new UserLogRecord();
byte userRecord[];
- System.out.println(" Get txns to recover " + initialScanPoint.getLogLSN() );
+ //System.out.println(" Get txns to recover " + initialScanPoint.getLogLSN() );
try
{
@@ -1023,7 +1027,7 @@ class DefaultTxnManager implements TxnMa
if ( stateChange.getTxnState() == ChangeState.TXN_COMMIT )
{
- System.out.println("Adding txn " + stateChange.getTxnID() + " to the tobe recovered txns");
+ //System.out.println("Adding txn " + stateChange.getTxnID() + " to the tobe recovered txns");
txnsToRecover.add( new Long( stateChange.getTxnID() ) );
}
}
@@ -1043,7 +1047,7 @@ class DefaultTxnManager implements TxnMa
{
Dn partitionSuffix = partition.getSuffixDn();
- System.out.println("Recover partition " + partitionSuffix);
+ //System.out.println("Recover partition " + partitionSuffix);
LogScanner logScanner = wal.beginScan( initialScanPoint );
UserLogRecord logRecord = new UserLogRecord();
@@ -1065,8 +1069,8 @@ class DefaultTxnManager implements TxnMa
DataChangeContainer dataChangeContainer = new DataChangeContainer();
dataChangeContainer.readExternal(in);
- System.out.println("Data change container for " + dataChangeContainer.getPartitionDn() +
- " txn id " + dataChangeContainer.getTxnID() );
+ //System.out.println("Data change container for " + dataChangeContainer.getPartitionDn() +
+ // " txn id " + dataChangeContainer.getTxnID() );
// If this change is for the partition we are tyring to recover
// and belongs to a txn that committed, then
Added: directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/RandomFileCursor.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/RandomFileCursor.java?rev=1355264&view=auto
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/RandomFileCursor.java (added)
+++ directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/RandomFileCursor.java Fri Jun 29 08:25:25 2012
@@ -0,0 +1,299 @@
+package org.apache.directory.server.core.shared.txn;
+
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.RandomAccessFile;
+
+import org.apache.directory.server.core.api.entry.ClonedServerEntry;
+import org.apache.directory.server.core.api.filtering.EntryFilteringCursor;
+import org.apache.directory.shared.i18n.I18n;
+import org.apache.directory.shared.ldap.model.cursor.AbstractCursor;
+import org.apache.directory.shared.ldap.model.cursor.InvalidCursorPositionException;
+import org.apache.directory.shared.ldap.model.entry.DefaultEntry;
+import org.apache.directory.shared.ldap.model.entry.Entry;
+import org.apache.directory.shared.ldap.model.schema.SchemaManager;
+
+
+public class RandomFileCursor extends AbstractCursor<Entry>
+{
+ /* Path to the file containing entries */
+ private File file;
+
+ /* Size of the previous entry(if a next is done). Used to go back on the cursor */
+ private int lastEntrySize;
+
+ /* Current offset into the file */
+ private int currentOffset;
+
+ /* Prefetched entry */
+ private Entry prefetched;
+
+ /* Whether cursor can move to beforefirst */
+ private boolean canMoveBeforeFirst;
+
+ private SchemaManager schemaManager;
+
+
+ RandomFileCursor( File file, EntryFilteringCursor cursor, boolean canMoveBeforeFirst,
+ SchemaManager schemaManager ) throws Exception
+ {
+ this.file = file;
+ this.canMoveBeforeFirst = canMoveBeforeFirst;
+ this.schemaManager = schemaManager;
+
+ RandomAccessFile raf = new RandomAccessFile( file, "rw" );
+
+ Entry entry;
+
+ if ( cursor.available() )
+ {
+ entry = cursor.get();
+ prefetched = (( ClonedServerEntry )entry ).getClonedEntry();
+ lastEntrySize = writeEntry( raf, entry );
+ currentOffset = lastEntrySize + 4;
+ }
+
+ try
+ {
+ while ( cursor.next() )
+ {
+ entry = cursor.get();
+ writeEntry( raf, entry );
+
+ }
+ }
+ finally
+ {
+ raf.close();
+ }
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean available()
+ {
+ return prefetched != null;
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean next() throws Exception
+ {
+ RandomAccessFile raf = new RandomAccessFile( file, "r" );
+ byte[] data;
+ int length;
+
+ ObjectInputStream in = null;
+ ByteArrayInputStream bin = null;
+
+ try
+ {
+ if ( currentOffset >= raf.length() )
+ return false;
+
+ raf.seek( currentOffset );
+ length = raf.readInt();
+
+ data = new byte[length];
+ raf.read( data, 0, length );
+
+ bin = new ByteArrayInputStream( data );
+ in = new ObjectInputStream( bin );
+
+ prefetched = new DefaultEntry();
+ prefetched.readExternal( in );
+
+ lastEntrySize = length;
+ currentOffset += 4 + length;
+ }
+ finally
+ {
+ if ( bin != null )
+ {
+ bin.close();
+ }
+
+ if ( in != null )
+ {
+ in.close();
+ }
+
+ raf.close();
+ }
+
+ return true;
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean previous() throws Exception
+ {
+ if ( lastEntrySize == 0 )
+ {
+ return false;
+ }
+
+ if ( currentOffset < ( lastEntrySize + 4 ) )
+ {
+ throw new IllegalStateException( "RandomFileCursor currenOffset: " + currentOffset + " lastEntrySize " +
+ lastEntrySize );
+ }
+
+ currentOffset -= lastEntrySize + 4;
+ next();
+ currentOffset -= lastEntrySize + 4;
+ lastEntrySize = 0;
+ return true;
+
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public Entry get() throws Exception
+ {
+ if ( available() )
+ {
+ return new ClonedServerEntry( schemaManager, prefetched );
+ }
+
+ throw new InvalidCursorPositionException();
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public void after( Entry entry ) throws Exception
+ {
+ throw new UnsupportedOperationException( I18n.err( I18n.ERR_02014_UNSUPPORTED_OPERATION, getClass().getName()
+ .concat( "." ).concat( "after()" ) ) );
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public void afterLast() throws Exception
+ {
+ throw new UnsupportedOperationException( I18n.err( I18n.ERR_02014_UNSUPPORTED_OPERATION, getClass().getName()
+ .concat( "." ).concat( "afterLast()" ) ) );
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean last() throws Exception
+ {
+ throw new UnsupportedOperationException( I18n.err( I18n.ERR_02014_UNSUPPORTED_OPERATION, getClass().getName()
+ .concat( "." ).concat( "last()" ) ) );
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean first() throws Exception
+ {
+ beforeFirst();
+ return next();
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public void beforeFirst() throws Exception
+ {
+ if ( !canMoveBeforeFirst )
+ {
+ throw new UnsupportedOperationException( I18n.err( I18n.ERR_02014_UNSUPPORTED_OPERATION, getClass()
+ .getName()
+ .concat( "." ).concat( "beforeFirst()" ) ) );
+ }
+ else
+ {
+ currentOffset = 0;
+ lastEntrySize = 0;
+ }
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public void before( Entry entry ) throws Exception
+ {
+ throw new UnsupportedOperationException( I18n.err( I18n.ERR_02014_UNSUPPORTED_OPERATION, getClass().getName()
+ .concat( "." ).concat( "beforeEntry()" ) ) );
+ }
+
+
+ private int writeEntry( RandomAccessFile raf, Entry entry ) throws Exception
+ {
+ entry = ( ( ClonedServerEntry )entry ).getOriginalEntry();
+ byte[] data;
+
+ ObjectOutputStream out = null;
+ ByteArrayOutputStream bout = null;
+ try
+ {
+ bout = new ByteArrayOutputStream();
+ out = new ObjectOutputStream( bout );
+ entry.writeExternal( out );
+
+ out.flush();
+ data = bout.toByteArray();
+ }
+ finally
+ {
+ if ( bout != null )
+ {
+ bout.close();
+ }
+
+ if ( out != null )
+ {
+ out.close();
+ }
+ }
+
+ raf.writeInt( data.length );
+ raf.write( data, 0, data.length );
+
+ return data.length;
+ }
+}
Modified: directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/TxnManagerFactory.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/TxnManagerFactory.java?rev=1355264&r1=1355263&r2=1355264&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/TxnManagerFactory.java (original)
+++ directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/TxnManagerFactory.java Fri Jun 29 08:25:25 2012
@@ -20,10 +20,12 @@
package org.apache.directory.server.core.shared.txn;
+import java.io.File;
import java.io.IOException;
import org.apache.directory.server.core.api.log.InvalidLogException;
import org.apache.directory.server.core.api.log.Log;
+import org.apache.directory.server.core.api.txn.LeakedCursorManager;
import org.apache.directory.server.core.api.txn.TxnLogManager;
import org.apache.directory.server.core.api.txn.TxnManager;
import org.apache.directory.server.core.shared.log.DefaultLog;
@@ -41,6 +43,9 @@ public class TxnManagerFactory
/** The only txn log manager */
private TxnLogManagerInternal txnLogManager;
+ /** The only leaked cursor manager */
+ private LeakedCursorManager leakedCursorManager;
+
/** WAL */
private Log log;
@@ -79,6 +84,8 @@ public class TxnManagerFactory
txnLogManager = new DefaultTxnLogManager( log, this );
+ leakedCursorManager = new DefaultLeakedCursorManager( logFolderPath + File.separatorChar + "cursors" );
+
this.init();
}
@@ -100,6 +107,8 @@ public class TxnManagerFactory
}
( ( DefaultTxnManager ) txnManager ).init(txnLogManager);
+
+ ( ( DefaultLeakedCursorManager )leakedCursorManager ).init();
inited = true;
}
@@ -114,6 +123,7 @@ public class TxnManagerFactory
( ( DefaultTxnManager ) txnManager ).shutdown();
( ( DefaultTxnLogManager ) txnLogManager ).shutdown();
+ ( ( DefaultLeakedCursorManager )leakedCursorManager ).shutdown();
inited = false;
}
@@ -130,6 +140,12 @@ public class TxnManagerFactory
}
+ public LeakedCursorManager leakedCursorManagerInstance()
+ {
+ return leakedCursorManager;
+ }
+
+
TxnManagerInternal txnManagerInternalInstance()
{
return txnManager;
Modified: directory/apacheds/branches/apacheds-txns/core-shared/src/test/java/org/apache/directory/server/core/shared/log/UserLogRecordTest.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core-shared/src/test/java/org/apache/directory/server/core/shared/log/UserLogRecordTest.java?rev=1355264&r1=1355263&r2=1355264&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core-shared/src/test/java/org/apache/directory/server/core/shared/log/UserLogRecordTest.java (original)
+++ directory/apacheds/branches/apacheds-txns/core-shared/src/test/java/org/apache/directory/server/core/shared/log/UserLogRecordTest.java Fri Jun 29 08:25:25 2012
@@ -59,11 +59,11 @@ public class UserLogRecordTest
{
ObjectInputStream oIn = null;
ByteArrayInputStream in = new ByteArrayInputStream( buffer );
-
try
{
oIn = new ObjectInputStream( in );
-
+ oIn.read();
+
return oIn;
}
catch ( IOException ioe )
Modified: directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/DefaultDirectoryService.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/DefaultDirectoryService.java?rev=1355264&r1=1355263&r2=1355264&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/DefaultDirectoryService.java (original)
+++ directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/DefaultDirectoryService.java Fri Jun 29 08:25:25 2012
@@ -81,6 +81,7 @@ import org.apache.directory.server.core.
import org.apache.directory.server.core.api.schema.SchemaPartition;
import org.apache.directory.server.core.api.subtree.SubentryCache;
import org.apache.directory.server.core.api.subtree.SubtreeEvaluator;
+import org.apache.directory.server.core.api.txn.LeakedCursorManager;
import org.apache.directory.server.core.api.txn.TxnLogManager;
import org.apache.directory.server.core.api.txn.TxnManager;
import org.apache.directory.server.core.authn.AuthenticationInterceptor;
@@ -1179,14 +1180,6 @@ public class DefaultDirectoryService imp
do
{
- //TODO TODO
- // THE followign revert was done in one txn. However, when then number
- // of changes got close to 1000, it got really slow. For now doing this
- // in small txns, but identify the cause of this perf problem.
-
- //txnManager.beginTransaction( false );
-
- boolean startedTxn = false;
List<ChangeLogEvent> events = new LinkedList();
try
@@ -1215,7 +1208,6 @@ public class DefaultDirectoryService imp
}
Iterator<ChangeLogEvent> it = events.iterator();
- boolean inTxn = false;
try
{
@@ -2515,6 +2507,15 @@ public class DefaultDirectoryService imp
{
return txnManagerFactory.txnLogManagerInstance();
}
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public LeakedCursorManager getLeakedCursorManager()
+ {
+ return txnManagerFactory.leakedCursorManagerInstance();
+ }
/**
Modified: directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/DefaultOperationManager.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/DefaultOperationManager.java?rev=1355264&r1=1355263&r2=1355264&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/DefaultOperationManager.java (original)
+++ directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/DefaultOperationManager.java Fri Jun 29 08:25:25 2012
@@ -1864,6 +1864,9 @@ public class DefaultOperationManager imp
cursor.setTxnManager( txnManager );
+ cursor.setTimestamp( System.currentTimeMillis() );
+ directoryService.getLeakedCursorManager().trackCursor( cursor );
+
txnManager.endLogicalDataRead();
txnManager.setCurTxn( null );
}
Modified: directory/apacheds/branches/apacheds-txns/server-integ/src/test/java/org/apache/directory/server/operations/search/PagedSearchIT.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/server-integ/src/test/java/org/apache/directory/server/operations/search/PagedSearchIT.java?rev=1355264&r1=1355263&r2=1355264&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-txns/server-integ/src/test/java/org/apache/directory/server/operations/search/PagedSearchIT.java (original)
+++ directory/apacheds/branches/apacheds-txns/server-integ/src/test/java/org/apache/directory/server/operations/search/PagedSearchIT.java Fri Jun 29 08:25:25 2012
@@ -286,7 +286,7 @@ public class PagedSearchIT extends Abstr
try
{
list = ctx.search( "dc=users,ou=system", "(cn=*)", controls );
-
+
while ( list.hasMore() )
{
SearchResult result = list.next();