You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by ka...@apache.org on 2012/10/16 15:27:20 UTC
svn commit: r1398782 [1/2] - in /directory/apacheds/trunk:
core-api/src/main/java/org/apache/directory/server/core/api/event/
core-api/src/main/java/org/apache/directory/server/core/api/interceptor/context/
core-jndi/src/main/java/org/apache/directory/...
Author: kayyagari
Date: Tue Oct 16 13:27:19 2012
New Revision: 1398782
URL: http://svn.apache.org/viewvc?rev=1398782&view=rev
Log:
first drop of changes for supporting MMR
Modified:
directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/event/DirectoryListener.java
directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/event/EventType.java
directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/interceptor/context/AbstractChangeOperationContext.java
directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/interceptor/context/LookupOperationContext.java
directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/interceptor/context/SearchOperationContext.java
directory/apacheds/trunk/core-jndi/src/main/java/org/apache/directory/server/core/jndi/EventListenerAdapter.java
directory/apacheds/trunk/interceptor-kerberos/src/main/java/org/apache/directory/server/core/kerberos/KeyDerivationInterceptor.java
directory/apacheds/trunk/interceptors/authn/src/main/java/org/apache/directory/server/core/authn/AuthenticationInterceptor.java
directory/apacheds/trunk/interceptors/collective/src/main/java/org/apache/directory/server/core/collective/CollectiveAttributeInterceptor.java
directory/apacheds/trunk/interceptors/event/src/main/java/org/apache/directory/server/core/event/EventInterceptor.java
directory/apacheds/trunk/interceptors/operational/src/main/java/org/apache/directory/server/core/operational/OperationalAttributeInterceptor.java
directory/apacheds/trunk/interceptors/subtree/src/main/java/org/apache/directory/server/core/subtree/SubentryInterceptor.java
directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/LdapProtocolUtils.java
directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/handlers/PersistentSearchListener.java
directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/handlers/SearchAbandonListener.java
directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventMessage.java
directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncreplConfiguration.java
directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/consumer/ReplicationConsumerImpl.java
directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/ReplConsumerManager.java
directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/ReplicaEventLog.java
directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/ReplicaJournalCursor.java
directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/SyncReplRequestHandler.java
directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/SyncReplSearchListener.java
Modified: directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/event/DirectoryListener.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/event/DirectoryListener.java?rev=1398782&r1=1398781&r2=1398782&view=diff
==============================================================================
--- directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/event/DirectoryListener.java (original)
+++ directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/event/DirectoryListener.java Tue Oct 16 13:27:19 2012
@@ -81,4 +81,12 @@ public interface DirectoryListener
* @param moveAndRenameContext the move/rename operation context responsible for the change
*/
void entryMovedAndRenamed( MoveAndRenameOperationContext moveAndRenameContext );
+
+
+ /**
+ * indicates if this listener needs to be invoked synchronously
+ *
+ * @return true if should be invoked synchronously, false otherwise
+ */
+ boolean isSynchronous();
}
Modified: directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/event/EventType.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/event/EventType.java?rev=1398782&r1=1398781&r2=1398782&view=diff
==============================================================================
--- directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/event/EventType.java (original)
+++ directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/event/EventType.java Tue Oct 16 13:27:19 2012
@@ -35,12 +35,13 @@ import org.apache.directory.shared.ldap.
*/
public enum EventType
{
- ADD(1),
- DELETE(2),
- MODIFY(4),
- RENAME(8),
- MOVE(16);
-
+ ADD(1),
+ DELETE(2),
+ MODIFY(4),
+ RENAME(8),
+ MOVE(16),
+ MOVE_AND_RENAME(24);
+
public static final int ALL_EVENT_TYPES_MASK = getAllEventTypesMask();
public static final int MOVE_AND_RENAME_MASK = MOVE.mask | RENAME.mask;
private static final EventType[] EMPTY_EVENT_ARRAY = new EventType[0];
Modified: directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/interceptor/context/AbstractChangeOperationContext.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/interceptor/context/AbstractChangeOperationContext.java?rev=1398782&r1=1398781&r2=1398782&view=diff
==============================================================================
--- directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/interceptor/context/AbstractChangeOperationContext.java (original)
+++ directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/interceptor/context/AbstractChangeOperationContext.java Tue Oct 16 13:27:19 2012
@@ -43,7 +43,22 @@ public abstract class AbstractChangeOper
/** The modified Entry as it will be stored into the backend */
protected Entry modifiedEntry;
-
+ /** flag to indicate if this context is carrying a replicated entry */
+ private boolean replEvent;
+
+ /** the rid present in the cookie received from a replication consumer */
+ private int rid = -1; // default value, an invalid rid
+
+ /** a flag to indicate when we don't want a replication event to be generated after this operation */
+ private boolean generateNoReplEvt;
+
+ /**
+ * flag to tell if this context needs to be sent to the event interceptor manually
+ * This is used only internally where certain modifications do not go through event
+ * interceptor.
+ */
+ private boolean pushToEvtIntrcptor;
+
/**
*
* Creates a new instance of AbstractChangeOperationContext.
@@ -117,4 +132,83 @@ public abstract class AbstractChangeOper
{
return logChange != LogChange.FALSE;
}
+
+
+ /**
+ * @return true if this context is containing a replication event
+ */
+ public boolean isReplEvent()
+ {
+ return replEvent;
+ }
+
+
+ /**
+ * @param replEvent mark the context as containing a replication event
+ */
+ public void setReplEvent( boolean replEvent )
+ {
+ this.replEvent = replEvent;
+ }
+
+
+ /**
+ * @return the replica ID received from a consumer
+ */
+ public int getRid()
+ {
+ return rid;
+ }
+
+
+ /**
+ * sets the replica ID received from a consumer
+ * @param rid
+ */
+ public void setRid( int rid )
+ {
+ this.rid = rid;
+ }
+
+
+ /**
+ * @return true if a replication event shouldn't be generated for the changes
+ * done using this operation context, false otherwise
+ */
+ public boolean isGenerateNoReplEvt()
+ {
+ return generateNoReplEvt;
+ }
+
+
+ /**
+ * sets whether or not to generate replication event messages by after an operation
+ * using this operation context completes
+ *
+ * @param generateNoReplEvt
+ */
+ public void setGenerateNoReplEvt( boolean generateNoReplEvt )
+ {
+ this.generateNoReplEvt = generateNoReplEvt;
+ }
+
+
+ /**
+ * @return true if this context needs to be pushed to the event interceptor from nexus
+ */
+ public boolean isPushToEvtIntrcptor()
+ {
+ return pushToEvtIntrcptor;
+ }
+
+
+ /**
+ * sets if this context needs to be pushed to the event interceptor from nexus
+ *
+ * @param pushToEvtIntrcptor
+ */
+ public void setPushToEvtIntrcptor( boolean pushToEvtIntrcptor )
+ {
+ this.pushToEvtIntrcptor = pushToEvtIntrcptor;
+ }
}
Modified: directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/interceptor/context/LookupOperationContext.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/interceptor/context/LookupOperationContext.java?rev=1398782&r1=1398781&r2=1398782&view=diff
==============================================================================
--- directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/interceptor/context/LookupOperationContext.java (original)
+++ directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/interceptor/context/LookupOperationContext.java Tue Oct 16 13:27:19 2012
@@ -54,6 +54,8 @@ public class LookupOperationContext exte
/** A flag set to true if the user has requested no attribute to be returned */
private Boolean noAttribute;
+ /** flag to indicate if this search is done for replication */
+ private boolean syncreplLookup;
/**
*
@@ -284,8 +286,28 @@ public class LookupOperationContext exte
return "Lookup";
}
+
+ /**
+ * @return true if this is a syncrepl specific search
+ */
+ public boolean isSyncreplLookup()
+ {
+ return syncreplLookup;
+ }
+
/**
+ * sets the flag to indicate if this is a synrepl specific search or not
+ *
+ * @param syncreplLookup
+ */
+ public void setSyncreplLookup( boolean syncreplLookup )
+ {
+ this.syncreplLookup = syncreplLookup;
+ }
+
+
+ /**
* @see Object#toString()
*/
public String toString()
Modified: directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/interceptor/context/SearchOperationContext.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/interceptor/context/SearchOperationContext.java?rev=1398782&r1=1398781&r2=1398782&view=diff
==============================================================================
--- directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/interceptor/context/SearchOperationContext.java (original)
+++ directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/interceptor/context/SearchOperationContext.java Tue Oct 16 13:27:19 2012
@@ -52,6 +52,9 @@ public class SearchOperationContext exte
private ExprNode filter;
+ /** flag to indicate if this search is done for replication */
+ private boolean syncreplSearch;
+
/**
* Creates a new instance of SearchOperationContext.
*/
@@ -213,4 +216,24 @@ public class SearchOperationContext exte
{
return MessageTypeEnum.SEARCH_REQUEST.name();
}
+
+
+ /**
+ * @return true if this is a syncrepl specific search
+ */
+ public boolean isSyncreplSearch()
+ {
+ return syncreplSearch;
+ }
+
+
+ /**
+ * sets the flag to indicate if this is a synrepl specific search or not
+ *
+ * @param syncreplSearch
+ */
+ public void setSyncreplSearch( boolean syncreplSearch )
+ {
+ this.syncreplSearch = syncreplSearch;
+ }
}
Modified: directory/apacheds/trunk/core-jndi/src/main/java/org/apache/directory/server/core/jndi/EventListenerAdapter.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/core-jndi/src/main/java/org/apache/directory/server/core/jndi/EventListenerAdapter.java?rev=1398782&r1=1398781&r2=1398782&view=diff
==============================================================================
--- directory/apacheds/trunk/core-jndi/src/main/java/org/apache/directory/server/core/jndi/EventListenerAdapter.java (original)
+++ directory/apacheds/trunk/core-jndi/src/main/java/org/apache/directory/server/core/jndi/EventListenerAdapter.java Tue Oct 16 13:27:19 2012
@@ -80,6 +80,13 @@ public class EventListenerAdapter implem
this.source = source;
this.listener = listener;
}
+
+
+ @Override
+ public boolean isSynchronous()
+ {
+ return false; // always asynchronous
+ }
private void deliverNamingExceptionEvent( Exception e )
Modified: directory/apacheds/trunk/interceptor-kerberos/src/main/java/org/apache/directory/server/core/kerberos/KeyDerivationInterceptor.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/interceptor-kerberos/src/main/java/org/apache/directory/server/core/kerberos/KeyDerivationInterceptor.java?rev=1398782&r1=1398781&r2=1398782&view=diff
==============================================================================
--- directory/apacheds/trunk/interceptor-kerberos/src/main/java/org/apache/directory/server/core/kerberos/KeyDerivationInterceptor.java (original)
+++ directory/apacheds/trunk/interceptor-kerberos/src/main/java/org/apache/directory/server/core/kerberos/KeyDerivationInterceptor.java Tue Oct 16 13:27:19 2012
@@ -92,6 +92,12 @@ public class KeyDerivationInterceptor ex
*/
public void add( AddOperationContext addContext ) throws LdapException
{
+ if ( addContext.isReplEvent() )
+ {
+ next( addContext );
+ return;
+ }
+
Dn normName = addContext.getDn();
Entry entry = addContext.getEntry();
@@ -147,6 +153,12 @@ public class KeyDerivationInterceptor ex
*/
public void modify( ModifyOperationContext modContext ) throws LdapException
{
+ if( modContext.isReplEvent() )
+ {
+ next( modContext );
+ return;
+ }
+
ModifySubContext subContext = new ModifySubContext();
detectPasswordModification( modContext, subContext );
Modified: directory/apacheds/trunk/interceptors/authn/src/main/java/org/apache/directory/server/core/authn/AuthenticationInterceptor.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/interceptors/authn/src/main/java/org/apache/directory/server/core/authn/AuthenticationInterceptor.java?rev=1398782&r1=1398781&r2=1398782&view=diff
==============================================================================
--- directory/apacheds/trunk/interceptors/authn/src/main/java/org/apache/directory/server/core/authn/AuthenticationInterceptor.java (original)
+++ directory/apacheds/trunk/interceptors/authn/src/main/java/org/apache/directory/server/core/authn/AuthenticationInterceptor.java Tue Oct 16 13:27:19 2012
@@ -313,7 +313,8 @@ public class AuthenticationInterceptor e
Entry entry = addContext.getEntry();
- if ( !directoryService.isPwdPolicyEnabled() )
+
+ if ( !directoryService.isPwdPolicyEnabled() || addContext.isReplEvent() )
{
next( addContext );
return;
@@ -594,7 +595,8 @@ public class AuthenticationInterceptor e
bindModCtx.setDn( dn );
bindModCtx.setEntry( userEntry );
bindModCtx.setModItems( mods );
-
+ bindModCtx.setPushToEvtIntrcptor( true );
+
directoryService.getPartitionNexus().modify( bindModCtx );
}
}
@@ -674,7 +676,8 @@ public class AuthenticationInterceptor e
bindModCtx.setDn( dn );
bindModCtx.setEntry( userEntry );
bindModCtx.setModItems( mods );
-
+ bindModCtx.setPushToEvtIntrcptor( true );
+
directoryService.getPartitionNexus().modify( bindModCtx );
}
@@ -830,7 +833,8 @@ public class AuthenticationInterceptor e
checkAuthenticated( modifyContext );
- if ( !directoryService.isPwdPolicyEnabled() )
+
+ if ( ! directoryService.isPwdPolicyEnabled() || modifyContext.isReplEvent() )
{
next( modifyContext );
invalidateAuthenticatorCaches( modifyContext.getDn() );
Modified: directory/apacheds/trunk/interceptors/collective/src/main/java/org/apache/directory/server/core/collective/CollectiveAttributeInterceptor.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/interceptors/collective/src/main/java/org/apache/directory/server/core/collective/CollectiveAttributeInterceptor.java?rev=1398782&r1=1398781&r2=1398782&view=diff
==============================================================================
--- directory/apacheds/trunk/interceptors/collective/src/main/java/org/apache/directory/server/core/collective/CollectiveAttributeInterceptor.java (original)
+++ directory/apacheds/trunk/interceptors/collective/src/main/java/org/apache/directory/server/core/collective/CollectiveAttributeInterceptor.java Tue Oct 16 13:27:19 2012
@@ -146,6 +146,12 @@ public class CollectiveAttributeIntercep
{
Entry result = next( lookupContext );
+ // do not add collective attributes
+ if( lookupContext.isSyncreplLookup() )
+ {
+ return result;
+ }
+
// Adding the collective attributes if any
if ( ( lookupContext.getAttrsId() == null ) || ( lookupContext.getAttrsId().size() == 0 ) )
{
@@ -178,7 +184,11 @@ public class CollectiveAttributeIntercep
{
EntryFilteringCursor cursor = next( searchContext );
- cursor.addEntryFilter( SEARCH_FILTER );
+ // only add collective attributes for non-syncrepl search
+ if( !searchContext.isSyncreplSearch() )
+ {
+ cursor.addEntryFilter( SEARCH_FILTER );
+ }
return cursor;
}
Modified: directory/apacheds/trunk/interceptors/event/src/main/java/org/apache/directory/server/core/event/EventInterceptor.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/interceptors/event/src/main/java/org/apache/directory/server/core/event/EventInterceptor.java?rev=1398782&r1=1398781&r2=1398782&view=diff
==============================================================================
--- directory/apacheds/trunk/interceptors/event/src/main/java/org/apache/directory/server/core/event/EventInterceptor.java (original)
+++ directory/apacheds/trunk/interceptors/event/src/main/java/org/apache/directory/server/core/event/EventInterceptor.java Tue Oct 16 13:27:19 2012
@@ -107,58 +107,93 @@ public class EventInterceptor extends Ba
switch ( type )
{
case ADD:
- executor.execute( new Runnable()
+ if ( listener.isSynchronous() )
{
- public void run()
+ listener.entryAdded( ( AddOperationContext ) opContext );
+ }
+ else
+ {
+ executor.execute( new Runnable()
{
- listener.entryAdded( ( AddOperationContext ) opContext );
- }
- } );
-
+ public void run()
+ {
+ listener.entryAdded( ( AddOperationContext ) opContext );
+ }
+ } );
+ }
+
break;
-
+
case DELETE:
- executor.execute( new Runnable()
+ if ( listener.isSynchronous() )
{
- public void run()
+ listener.entryDeleted( ( DeleteOperationContext ) opContext );
+ }
+ else
+ {
+ executor.execute( new Runnable()
{
- listener.entryDeleted( ( DeleteOperationContext ) opContext );
- }
- } );
-
+ public void run()
+ {
+ listener.entryDeleted( ( DeleteOperationContext ) opContext );
+ }
+ } );
+ }
+
break;
-
+
case MODIFY:
- executor.execute( new Runnable()
+ if ( listener.isSynchronous() )
{
- public void run()
+ listener.entryModified( ( ModifyOperationContext ) opContext );
+ }
+ else
+ {
+ executor.execute( new Runnable()
{
- listener.entryModified( ( ModifyOperationContext ) opContext );
- }
- } );
-
+ public void run()
+ {
+ listener.entryModified( ( ModifyOperationContext ) opContext );
+ }
+ } );
+ }
+
break;
-
+
case MOVE:
- executor.execute( new Runnable()
+ if ( listener.isSynchronous() )
{
- public void run()
+ listener.entryMoved( ( MoveOperationContext ) opContext );
+ }
+ else
+ {
+ executor.execute( new Runnable()
{
- listener.entryMoved( ( MoveOperationContext ) opContext );
- }
- } );
-
+ public void run()
+ {
+ listener.entryMoved( ( MoveOperationContext ) opContext );
+ }
+ } );
+ }
+
break;
case RENAME:
- executor.execute( new Runnable()
+ if ( listener.isSynchronous() )
{
- public void run()
+ listener.entryRenamed( ( RenameOperationContext ) opContext );
+ }
+ else
+ {
+ executor.execute( new Runnable()
{
- listener.entryRenamed( ( RenameOperationContext ) opContext );
- }
- } );
-
+ public void run()
+ {
+ listener.entryRenamed( ( RenameOperationContext ) opContext );
+ }
+ } );
+ }
+
break;
}
}
@@ -220,7 +255,11 @@ public class EventInterceptor extends Ba
List<RegistrationEntry> selecting = getSelectingRegistrations( modifyContext.getDn(), oriEntry );
- next( modifyContext );
+ // modification was already done when this flag is turned on, move to sending the events
+ if( !modifyContext.isPushToEvtIntrcptor() )
+ {
+ next( modifyContext );
+ }
if ( selecting.isEmpty() )
{
Modified: directory/apacheds/trunk/interceptors/operational/src/main/java/org/apache/directory/server/core/operational/OperationalAttributeInterceptor.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/interceptors/operational/src/main/java/org/apache/directory/server/core/operational/OperationalAttributeInterceptor.java?rev=1398782&r1=1398781&r2=1398782&view=diff
==============================================================================
--- directory/apacheds/trunk/interceptors/operational/src/main/java/org/apache/directory/server/core/operational/OperationalAttributeInterceptor.java (original)
+++ directory/apacheds/trunk/interceptors/operational/src/main/java/org/apache/directory/server/core/operational/OperationalAttributeInterceptor.java Tue Oct 16 13:27:19 2012
@@ -36,6 +36,7 @@ import org.apache.directory.server.core.
import org.apache.directory.server.core.api.interceptor.BaseInterceptor;
import org.apache.directory.server.core.api.interceptor.Interceptor;
import org.apache.directory.server.core.api.interceptor.context.AddOperationContext;
+import org.apache.directory.server.core.api.interceptor.context.DeleteOperationContext;
import org.apache.directory.server.core.api.interceptor.context.ListOperationContext;
import org.apache.directory.server.core.api.interceptor.context.LookupOperationContext;
import org.apache.directory.server.core.api.interceptor.context.ModifyOperationContext;
@@ -440,6 +441,10 @@ public class OperationalAttributeInterce
Entry modifiedEntry = moveContext.getOriginalEntry().clone();
modifiedEntry.put( SchemaConstants.MODIFIERS_NAME_AT, getPrincipal( moveContext ).getName() );
modifiedEntry.put( SchemaConstants.MODIFY_TIMESTAMP_AT, DateUtils.getGeneralizedTime() );
+
+ Attribute csnAt = new DefaultAttribute( ENTRY_CSN_AT, directoryService.getCSN().toString() );
+ modifiedEntry.put( csnAt );
+
modifiedEntry.setDn( moveContext.getNewDn() );
moveContext.setModifiedEntry( modifiedEntry );
@@ -456,6 +461,10 @@ public class OperationalAttributeInterce
modifiedEntry.put( SchemaConstants.MODIFIERS_NAME_AT, getPrincipal( moveAndRenameContext ).getName() );
modifiedEntry.put( SchemaConstants.MODIFY_TIMESTAMP_AT, DateUtils.getGeneralizedTime() );
modifiedEntry.setDn( moveAndRenameContext.getNewDn() );
+
+ Attribute csnAt = new DefaultAttribute( ENTRY_CSN_AT, directoryService.getCSN().toString() );
+ modifiedEntry.put( csnAt );
+
moveAndRenameContext.setModifiedEntry( modifiedEntry );
next( moveAndRenameContext );
@@ -474,6 +483,10 @@ public class OperationalAttributeInterce
Entry modifiedEntry = renameContext.getOriginalEntry().clone();
modifiedEntry.put( SchemaConstants.MODIFIERS_NAME_AT, getPrincipal( renameContext ).getName() );
modifiedEntry.put( SchemaConstants.MODIFY_TIMESTAMP_AT, DateUtils.getGeneralizedTime() );
+
+ Attribute csnAt = new DefaultAttribute( ENTRY_CSN_AT, directoryService.getCSN().toString() );
+ modifiedEntry.put( csnAt );
+
renameContext.setModifiedEntry( modifiedEntry );
next( renameContext );
@@ -498,12 +511,22 @@ public class OperationalAttributeInterce
return cursor;
}
- cursor.addEntryFilter( SEARCH_FILTER );
-
return cursor;
}
+ @Override
+ public void delete( DeleteOperationContext deleteContext ) throws LdapException
+ {
+ // insert a new CSN into the entry, this is for replication
+ Entry entry = deleteContext.getEntry();
+ Attribute csnAt = new DefaultAttribute( ENTRY_CSN_AT, directoryService.getCSN().toString() );
+ entry.put( csnAt );
+
+ next( deleteContext );
+ }
+
+
/**
* Filters out the operational attributes within a search results attributes. The attributes are directly
* modified.
Modified: directory/apacheds/trunk/interceptors/subtree/src/main/java/org/apache/directory/server/core/subtree/SubentryInterceptor.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/interceptors/subtree/src/main/java/org/apache/directory/server/core/subtree/SubentryInterceptor.java?rev=1398782&r1=1398781&r2=1398782&view=diff
==============================================================================
--- directory/apacheds/trunk/interceptors/subtree/src/main/java/org/apache/directory/server/core/subtree/SubentryInterceptor.java (original)
+++ directory/apacheds/trunk/interceptors/subtree/src/main/java/org/apache/directory/server/core/subtree/SubentryInterceptor.java Tue Oct 16 13:27:19 2012
@@ -1567,6 +1567,12 @@ public class SubentryInterceptor extends
return cursor;
}
+ // DO NOT hide subentries for replication operations
+ if( searchContext.isSyncreplSearch() )
+ {
+ return cursor;
+ }
+
// for subtree and one level scope we filter
if ( !isSubentryVisible( searchContext ) )
{
Modified: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/LdapProtocolUtils.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/LdapProtocolUtils.java?rev=1398782&r1=1398781&r2=1398782&view=diff
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/LdapProtocolUtils.java (original)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/LdapProtocolUtils.java Tue Oct 16 13:27:19 2012
@@ -43,8 +43,8 @@ public class LdapProtocolUtils
/** the prefix for replicaId value */
public static final String REPLICA_ID_PREFIX = "rid=";
- private static final int REPLICA_ID_PREFIX_LEN = REPLICA_ID_PREFIX.length();
-
+ public static final int REPLICA_ID_PREFIX_LEN = REPLICA_ID_PREFIX.length();
+
/** the prefix for Csn value */
public static final String CSN_PREFIX = "csn=";
Modified: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/handlers/PersistentSearchListener.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/handlers/PersistentSearchListener.java?rev=1398782&r1=1398781&r2=1398782&view=diff
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/handlers/PersistentSearchListener.java (original)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/handlers/PersistentSearchListener.java Tue Oct 16 13:27:19 2012
@@ -72,6 +72,13 @@ public class PersistentSearchListener im
this.psearchControl = ( PersistentSearch ) req.getControls().get( PersistentSearch.OID );
}
+
+ @Override
+ public boolean isSynchronous()
+ {
+ return false; // always asynchronous
+ }
+
public void abandon() throws LdapException
{
Modified: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/handlers/SearchAbandonListener.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/handlers/SearchAbandonListener.java?rev=1398782&r1=1398781&r2=1398782&view=diff
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/handlers/SearchAbandonListener.java (original)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/handlers/SearchAbandonListener.java Tue Oct 16 13:27:19 2012
@@ -79,7 +79,7 @@ public class SearchAbandonListener imple
try
{
- if ( cursor != null )
+ if ( ( cursor != null ) && !cursor.isClosed() )
{
/*
* When this method is called due to an abandon request it
Modified: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventMessage.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventMessage.java?rev=1398782&r1=1398781&r2=1398782&view=diff
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventMessage.java (original)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventMessage.java Tue Oct 16 13:27:19 2012
@@ -35,11 +35,10 @@ public class ReplicaEventMessage
{
/** The message change type */
private ChangeType changeType;
-
+
/** The entry */
private Entry entry;
-
/**
* Create a new ReplicaEvent instance for a Add/Delete+Modify operation
* @param changeType The change type
@@ -79,10 +78,15 @@ public class ReplicaEventMessage
*/
public boolean isEventOlderThan( String csn ) throws Exception
{
+ if( csn == null )
+ {
+ return false;
+ }
+
String entryCsn = entry.get( SchemaConstants.ENTRY_CSN_AT ).getString();
-
+
int i = entryCsn.compareTo( csn );
-
- return ( i < 0 );
+
+ return ( i <= 0 );
}
}
Modified: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncreplConfiguration.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncreplConfiguration.java?rev=1398782&r1=1398781&r2=1398782&view=diff
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncreplConfiguration.java (original)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncreplConfiguration.java Tue Oct 16 13:27:19 2012
@@ -126,6 +126,8 @@ public class SyncreplConfiguration imple
/** the X509 certificate trust manager used, default value set to {@link NoVerificationTrustManager} */
private X509TrustManager trustManager = new NoVerificationTrustManager();
+ /** flag to indicate if this node is part of a MMR setup, default value is true */
+ private boolean mmrMode = true;
/**
* Creates a new instance of SyncreplConfiguration
@@ -134,9 +136,7 @@ public class SyncreplConfiguration imple
{
attributes = new HashSet<String>();
attributes.add( SchemaConstants.ALL_USER_ATTRIBUTES );
- attributes.add( Strings.toLowerCase( SchemaConstants.ENTRY_UUID_AT ) );
- attributes.add( Strings.toLowerCase( SchemaConstants.ENTRY_CSN_AT ) );
- attributes.add( Strings.toLowerCase( SchemaConstants.REF_AT ) );
+ attributes.add( Strings.toLowerCase( SchemaConstants.ALL_OPERATIONAL_ATTRIBUTES ) );
}
@@ -452,17 +452,6 @@ public class SyncreplConfiguration imple
}
- /** a flag to indicate to store the cookie in a file, default is false
- * NOTE: a value of true indicates that the cookie will be stored
- * on file system, which is useful while testing the consumer
- * without loading config partition
- */
- public boolean isStoreCookieInFile()
- {
- return ( configEntryDn == null );
- }
-
-
/**
* Tells if we chase referrals
* @return true if we chase referals
@@ -561,4 +550,23 @@ public class SyncreplConfiguration imple
{
this.configEntryDn = configEntryDn;
}
+
+ /**
+ * @return true if this node is part of MMR setup
+ */
+ public boolean isMmrMode()
+ {
+ return mmrMode;
+ }
+
+
+ /**
+ * enable/disable MMR option
+ *
+ * @param mmrMode
+ */
+ public void setMmrMode( boolean mmrMode )
+ {
+ this.mmrMode = mmrMode;
+ }
}
Modified: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/consumer/ReplicationConsumerImpl.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/consumer/ReplicationConsumerImpl.java?rev=1398782&r1=1398781&r2=1398782&view=diff
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/consumer/ReplicationConsumerImpl.java (original)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/consumer/ReplicationConsumerImpl.java Tue Oct 16 13:27:19 2012
@@ -20,10 +20,6 @@
package org.apache.directory.server.ldap.replication.consumer;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -33,13 +29,21 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.commons.collections.map.LRUMap;
import org.apache.directory.ldap.client.api.ConnectionClosedEventListener;
import org.apache.directory.ldap.client.api.LdapNetworkConnection;
import org.apache.directory.ldap.client.api.future.SearchFuture;
import org.apache.directory.server.core.api.CoreSession;
import org.apache.directory.server.core.api.DirectoryService;
+import org.apache.directory.server.core.api.OperationManager;
import org.apache.directory.server.core.api.filtering.EntryFilteringCursor;
-import org.apache.directory.server.i18n.I18n;
+import org.apache.directory.server.core.api.interceptor.context.AddOperationContext;
+import org.apache.directory.server.core.api.interceptor.context.LookupOperationContext;
+import org.apache.directory.server.core.api.interceptor.context.ModifyOperationContext;
+import org.apache.directory.server.core.api.interceptor.context.MoveAndRenameOperationContext;
+import org.apache.directory.server.core.api.interceptor.context.MoveOperationContext;
+import org.apache.directory.server.core.api.interceptor.context.RenameOperationContext;
+import org.apache.directory.server.ldap.LdapProtocolUtils;
import org.apache.directory.server.ldap.replication.ReplicationConsumerConfig;
import org.apache.directory.server.ldap.replication.SyncreplConfiguration;
import org.apache.directory.shared.ldap.codec.controls.manageDsaIT.ManageDsaITDecorator;
@@ -53,6 +57,7 @@ import org.apache.directory.shared.ldap.
import org.apache.directory.shared.ldap.extras.controls.syncrepl_impl.SyncInfoValueDecorator;
import org.apache.directory.shared.ldap.extras.controls.syncrepl_impl.SyncRequestValueDecorator;
import org.apache.directory.shared.ldap.model.constants.SchemaConstants;
+import org.apache.directory.shared.ldap.model.csn.Csn;
import org.apache.directory.shared.ldap.model.entry.Attribute;
import org.apache.directory.shared.ldap.model.entry.DefaultAttribute;
import org.apache.directory.shared.ldap.model.entry.DefaultEntry;
@@ -117,9 +122,6 @@ public class ReplicationConsumerImpl imp
/** the schema manager */
private SchemaManager schemaManager;
- /** the cookie file */
- private File cookieFile;
-
/** flag to indicate whether the consumer was disconnected */
private boolean disconnected;
@@ -129,14 +131,12 @@ public class ReplicationConsumerImpl imp
/** attributes on which modification should be ignored */
private static final String[] MOD_IGNORE_AT = new String[]
{
- SchemaConstants.ENTRY_UUID_AT,
- SchemaConstants.ENTRY_CSN_AT,
- SchemaConstants.MODIFIERS_NAME_AT,
- SchemaConstants.MODIFY_TIMESTAMP_AT,
- SchemaConstants.CREATE_TIMESTAMP_AT,
- SchemaConstants.CREATORS_NAME_AT,
- SchemaConstants.ENTRY_PARENT_ID_AT
- };
+ SchemaConstants.ENTRY_UUID_AT,
+ SchemaConstants.CREATE_TIMESTAMP_AT,
+ SchemaConstants.CREATORS_NAME_AT,
+ SchemaConstants.ENTRY_PARENT_ID_AT,
+ SchemaConstants.COLLECTIVE_ATTRIBUTE_SUBENTRIES_AT
+ };
/** A thread used to refresh in refreshOnly mode */
private RefresherThread refreshThread;
@@ -151,12 +151,16 @@ public class ReplicationConsumerImpl imp
private static final Set<AttributeTypeOptions> ENTRY_UUID_ATOP_SET = new HashSet<AttributeTypeOptions>();
private Modification cookieMod;
+
+ private Modification ridMod;
/** AttributeTypes used for replication */
private static AttributeType COOKIE_AT_TYPE;
private static AttributeType ENTRY_UUID_AT;
+ private static AttributeType RID_AT_TYPE;
-
+ private static final Map<String,Object> uuidLockMap = new LRUMap( 1000 );
+
/**
* @return the config
*/
@@ -174,31 +178,22 @@ public class ReplicationConsumerImpl imp
{
this.directoryService = directoryservice;
- if ( config.isStoreCookieInFile() )
- {
- File cookieDir = new File( directoryservice.getInstanceLayout().getRunDirectory(), "cookies" );
- if ( !cookieDir.mkdir() )
- {
- throw new IOException( I18n.err( I18n.ERR_112_COULD_NOT_CREATE_DIRECORY, cookieDir ) );
- }
-
- cookieFile = new File( cookieDir, String.valueOf( config.getReplicaId() ) );
- }
-
session = directoryService.getAdminSession();
schemaManager = directoryservice.getSchemaManager();
ENTRY_UUID_AT = schemaManager.lookupAttributeTypeRegistry( SchemaConstants.ENTRY_UUID_AT );
COOKIE_AT_TYPE = schemaManager.lookupAttributeTypeRegistry( SchemaConstants.ADS_REPL_COOKIE );
-
+ RID_AT_TYPE = schemaManager.lookupAttributeTypeRegistry( SchemaConstants.ADS_DS_REPLICA_ID );
+
ENTRY_UUID_ATOP_SET.add( new AttributeTypeOptions( ENTRY_UUID_AT ) );
Attribute cookieAttr = new DefaultAttribute( COOKIE_AT_TYPE );
+ cookieMod = new DefaultModification( ModificationOperation.REPLACE_ATTRIBUTE, cookieAttr );
- Modification cookieMod = new DefaultModification( ModificationOperation.REPLACE_ATTRIBUTE, cookieAttr );
- this.cookieMod = cookieMod;
-
+ Attribute ridAttr = new DefaultAttribute( RID_AT_TYPE );
+ ridMod = new DefaultModification( ModificationOperation.REPLACE_ATTRIBUTE, ridAttr );
+
prepareSyncSearchRequest();
}
@@ -211,11 +206,11 @@ public class ReplicationConsumerImpl imp
*/
public boolean connect()
{
+ String providerHost = config.getRemoteHost();
+ int port = config.getRemotePort();
+
try
{
- String providerHost = config.getRemoteHost();
- int port = config.getRemotePort();
-
// Create a connection
if ( connection == null )
{
@@ -246,7 +241,8 @@ public class ReplicationConsumerImpl imp
}
catch ( Exception e )
{
- LOG.error( "Failed to bind with the given bindDN and credentials", e );
+ LOG.error( "Failed to connect to the server {}:{}", providerHost, String.valueOf( port ) );
+ LOG.error( "", e );
}
return false;
@@ -318,62 +314,83 @@ public class ReplicationConsumerImpl imp
try
{
- Entry remoteEntry = syncResult.getEntry();
-
- if ( syncStateCtrl.getCookie() != null )
- {
- syncCookie = syncStateCtrl.getCookie();
- LOG.debug( "assigning the cookie from sync state value control: "
- + Strings.utf8ToString( syncCookie ) );
- }
-
- SyncStateTypeEnum state = syncStateCtrl.getSyncStateType();
-
- LOG.debug( "state name {}", state.name() );
+ Entry remoteEntry = new DefaultEntry( schemaManager, syncResult.getEntry() );
+ String uuid = remoteEntry.get( ENTRY_UUID_AT ).getString();
+ // lock on UUID to serialize the updates when there are multiple consumers
+ // connected to several producers and to the *same* base/partition
+ Object lock = getLockFor( uuid );
+ synchronized ( lock )
+ {
+ int rid = -1;
+
+ if ( syncStateCtrl.getCookie() != null )
+ {
+ syncCookie = syncStateCtrl.getCookie();
+ rid = LdapProtocolUtils.getReplicaId( Strings.utf8ToString( syncCookie ) );
+ LOG.debug( "assigning the cookie from sync state value control: "
+ + Strings.utf8ToString(syncCookie) );
+ }
+
+ SyncStateTypeEnum state = syncStateCtrl.getSyncStateType();
+
+ LOG.debug( "state name {}", state.name() );
+
+ // check to avoid conversion of UUID from byte[] to String
+ if ( LOG.isDebugEnabled() )
+ {
+ LOG.debug( "entryUUID = {}", Strings.uuidToString(syncStateCtrl.getEntryUUID()) );
+ }
- // check to avoid conversion of UUID from byte[] to String
- if ( LOG.isDebugEnabled() )
- {
- LOG.debug( "entryUUID = {}", Strings.uuidToString( syncStateCtrl.getEntryUUID() ) );
- }
+ Dn remoteDn = remoteEntry.getDn();
switch ( state )
{
case ADD:
- Dn remoteDn = directoryService.getDnFactory().create( remoteEntry.getDn().getName() );
if ( !session.exists( remoteDn ) )
{
LOG.debug( "adding entry with dn {}", remoteDn );
LOG.debug( remoteEntry.toString() );
- session.add( new DefaultEntry( schemaManager, remoteEntry ) );
+ AddOperationContext addContext = new AddOperationContext( session, remoteEntry );
+ addContext.setReplEvent( true );
+ addContext.setRid( rid );
+
+ OperationManager operationManager = directoryService.getOperationManager();
+ operationManager.add( addContext );
}
else
{
LOG.debug( "updating entry in refreshOnly mode {}", remoteDn );
- modify( remoteEntry );
+ modify( remoteEntry, rid );
}
break;
case MODIFY:
LOG.debug( "modifying entry with dn {}", remoteEntry.getDn().getName() );
- modify( remoteEntry );
+ modify( remoteEntry, rid );
break;
case MODDN:
String entryUuid = Strings.uuidToString( syncStateCtrl.getEntryUUID() ).toString();
- applyModDnOperation( remoteEntry, entryUuid );
+ applyModDnOperation( remoteEntry, entryUuid, rid );
break;
case DELETE:
LOG.debug( "deleting entry with dn {}", remoteEntry.getDn().getName() );
- // incase of a MODDN operation resulting in a branch to be moved out of scope
- // ApacheDS replication provider sends a single delete event on the Dn of the moved branch
- // so the branch needs to be recursively deleted here
- deleteRecursive( remoteEntry.getDn(), null );
+ if ( !session.exists( remoteDn ) )
+ {
+ LOG.debug( "looks like entry {} was already deleted in a prior update (possibly from another provider), skipping delete", remoteDn );
+ }
+ else
+ {
+ // incase of a MODDN operation resulting in a branch to be moved out of scope
+ // ApacheDS replication provider sends a single delete event on the Dn of the moved branch
+ // so the branch needs to be recursively deleted here
+ deleteRecursive( remoteEntry.getDn(), null );
+ }
break;
@@ -387,6 +404,7 @@ public class ReplicationConsumerImpl imp
{
storeCookie();
}
+ }
}
catch ( Exception e )
{
@@ -418,11 +436,16 @@ public class ReplicationConsumerImpl imp
byte[] cookie = syncInfoValue.getCookie();
+ int replicaId = -1;
+
if ( cookie != null )
{
LOG.debug( "setting the cookie from the sync info: " + Strings.utf8ToString( cookie ) );
CONSUMER_LOG.debug( "setting the cookie from the sync info: " + Strings.utf8ToString( cookie ) );
syncCookie = cookie;
+
+ String cookieString = Strings.utf8ToString( syncCookie );
+ replicaId = LdapProtocolUtils.getReplicaId( cookieString );
}
LOG.info( "refreshDeletes: " + syncInfoValue.isRefreshDeletes() );
@@ -433,11 +456,11 @@ public class ReplicationConsumerImpl imp
// present in the syncIdSet
if ( syncInfoValue.isRefreshDeletes() )
{
- deleteEntries( uuidList, false );
+ deleteEntries( uuidList, false, replicaId );
}
else
{
- deleteEntries( uuidList, true );
+ deleteEntries( uuidList, true, replicaId );
}
LOG.info( "refreshDone: " + syncInfoValue.isRefreshDone() );
@@ -446,8 +469,7 @@ public class ReplicationConsumerImpl imp
}
catch ( Exception de )
{
- LOG.error( "Failed to handle syncinfo message" );
- de.printStackTrace();
+ LOG.error( "Failed to handle syncinfo message", de );
}
LOG.debug( ".................... END handleSyncInfo ..............." );
@@ -720,28 +742,22 @@ public class ReplicationConsumerImpl imp
try
{
- if ( config.isStoreCookieInFile() )
- {
- CONSUMER_LOG.debug( "Storingthe cookie in a file : {}", cookieFile );
- FileOutputStream fout = new FileOutputStream( cookieFile );
- fout.write( syncCookie.length );
- fout.write( syncCookie );
- fout.close();
- }
- else
- {
- Attribute attr = cookieMod.getAttribute();
- attr.clear();
- attr.add( syncCookie );
-
- CONSUMER_LOG.debug( "Storing the cookie in the DIT : {}", config.getConfigEntryDn() );
-
- session.modify( config.getConfigEntryDn(), cookieMod );
-
- Entry entry = session.lookup( config.getConfigEntryDn(), SchemaConstants.ALL_ATTRIBUTES_ARRAY );
-
- CONSUMER_LOG.debug( "stored entry : {}", entry );
- }
+ Attribute attr = cookieMod.getAttribute();
+ attr.clear();
+ attr.add( syncCookie );
+
+ String cookieString = Strings.utf8ToString( syncCookie );
+ int replicaId = LdapProtocolUtils.getReplicaId( cookieString );
+
+ Attribute ridAt = ridMod.getAttribute();
+ ridAt.clear();
+ ridAt.add( String.valueOf( replicaId ) );
+
+ CONSUMER_LOG.debug( "Storing the cookie in the DIT : {}", config.getConfigEntryDn() );
+
+ //session.modify( config.getConfigEntryDn(), cookieMod, ridMod );
+ session.modify( config.getConfigEntryDn(), cookieMod );
+ CONSUMER_LOG.debug( "stored the cookie in entry {}", config.getConfigEntryDn() );
lastSavedCookie = new byte[syncCookie.length];
System.arraycopy( syncCookie, 0, lastSavedCookie, 0, syncCookie.length );
@@ -750,7 +766,7 @@ public class ReplicationConsumerImpl imp
}
catch ( Exception e )
{
- LOG.error( "Failed to store the cookie", e );
+ LOG.error( "Failed to store the cookie in consumer entry {}", config.getConfigEntryDn(), e );
}
}
@@ -762,55 +778,28 @@ public class ReplicationConsumerImpl imp
{
try
{
- if ( config.isStoreCookieInFile() )
- {
- CONSUMER_LOG.debug( "The cookie is stored in a file : {}", cookieFile );
-
- if ( cookieFile.exists() && ( cookieFile.length() > 0 ) )
- {
- FileInputStream fin = new FileInputStream( cookieFile );
- syncCookie = new byte[fin.read()];
- fin.read( syncCookie );
- fin.close();
-
- lastSavedCookie = new byte[syncCookie.length];
- System.arraycopy( syncCookie, 0, lastSavedCookie, 0, syncCookie.length );
-
- LOG.debug( "read the cookie from file: " + Strings.utf8ToString( syncCookie ) );
- }
- }
- else
- {
- try
+ Entry entry = session.lookup( config.getConfigEntryDn(), SchemaConstants.ALL_ATTRIBUTES_ARRAY );
+
+ CONSUMER_LOG.debug( "The cookie is stored in the DIT : {}", entry );
+
+ if ( entry != null )
+ {
+ Attribute attr = entry.get( COOKIE_AT_TYPE );
+
+ if ( attr != null )
{
- Entry entry = session.lookup( config.getConfigEntryDn(), SchemaConstants.ALL_ATTRIBUTES_ARRAY );
-
- CONSUMER_LOG.debug( "The cookie is stored in the DIT : {}", entry );
-
- if ( entry != null )
- {
- Attribute attr = entry.get( COOKIE_AT_TYPE );
-
- if ( attr != null )
- {
- syncCookie = attr.getBytes();
- lastSavedCookie = syncCookie;
- CONSUMER_LOG.debug( "Read cookie : '{}'", attr );
- LOG.debug( "loaded cookie from DIT" );
- }
- }
- }
- catch ( Exception e )
- {
- // can be ignored, most likely happens if there is no entry with the given Dn
- // log in debug mode
- LOG.debug( "Failed to read the cookie from the entry", e );
+ syncCookie = attr.getBytes();
+ lastSavedCookie = syncCookie;
+ CONSUMER_LOG.debug( "Read cookie : '{}'", attr );
+ LOG.debug( "loaded cookie from DIT" );
}
}
}
catch ( Exception e )
{
- LOG.error( "Failed to read the cookie", e );
+ // can be ignored, most likely happens if there is no entry with the given Dn
+ // log in debug mode
+ LOG.debug( "Failed to read the cookie from the entry {}", config.getConfigEntryDn(), e );
}
}
@@ -820,28 +809,17 @@ public class ReplicationConsumerImpl imp
*/
public void removeCookie()
{
- if ( config.isStoreCookieInFile() )
+ try
{
- if ( cookieFile.exists() && ( cookieFile.length() > 0 ) )
- {
- boolean deleted = cookieFile.delete();
- LOG.info( "deleted cookie file {}", deleted );
- }
+ Attribute cookieAttr = new DefaultAttribute( COOKIE_AT_TYPE );
+ Modification deleteCookieMod = new DefaultModification( ModificationOperation.REMOVE_ATTRIBUTE,
+ cookieAttr );
+ session.modify( config.getConfigEntryDn(), deleteCookieMod );
}
- else
+ catch ( Exception e )
{
- try
- {
- Attribute cookieAttr = new DefaultAttribute( COOKIE_AT_TYPE );
- Modification deleteCookieMod = new DefaultModification( ModificationOperation.REMOVE_ATTRIBUTE,
- cookieAttr );
- session.modify( config.getConfigEntryDn(), deleteCookieMod );
- }
- catch ( Exception e )
- {
- LOG.warn( "Failed to delete the cookie from the entry with Dn {}", config.getConfigEntryDn() );
- LOG.warn( "{}", e );
- }
+ LOG.warn( "Failed to delete the cookie from the entry with Dn {}", config.getConfigEntryDn() );
+ LOG.warn( "{}", e );
}
LOG.info( "resetting sync cookie" );
@@ -851,7 +829,7 @@ public class ReplicationConsumerImpl imp
}
- private void applyModDnOperation( Entry remoteEntry, String entryUuid ) throws Exception
+ private void applyModDnOperation( Entry remoteEntry, String entryUuid, int rid ) throws Exception
{
LOG.debug( "MODDN for entry {}, new entry : {}", entryUuid, remoteEntry );
@@ -863,19 +841,42 @@ public class ReplicationConsumerImpl imp
// Retrieve locally the moved or renamed entry
String filter = "(entryUuid=" + entryUuid + ")";
SearchRequest searchRequest = new SearchRequestImpl();
- searchRequest.setBase( Dn.ROOT_DSE );
+ searchRequest.setBase( new Dn( schemaManager, config.getBaseDn() ) );
searchRequest.setFilter( filter );
searchRequest.setScope( SearchScope.SUBTREE );
searchRequest.addAttributes( "entryUuid", "entryCsn", "*" );
EntryFilteringCursor cursor = session.search( searchRequest );
cursor.beforeFirst();
- cursor.next();
-
- Entry localEntry = cursor.get();
+
+ Entry localEntry = null;
+
+ if ( cursor.next() )
+ {
+ localEntry = cursor.get();
+ }
cursor.close();
+ // can happen in MMR scenario
+ if ( localEntry == null )
+ {
+ return;
+ }
+
+ if ( config.isMmrMode() )
+ {
+ Csn localCsn = new Csn( localEntry.get( SchemaConstants.ENTRY_CSN_AT ).getString() );
+ Csn remoteCsn = new Csn( remoteEntry.get( SchemaConstants.ENTRY_CSN_AT ).getString() );
+
+ if ( localCsn.compareTo( remoteCsn ) >= 0 )
+ {
+ // just discard the received modified entry, that is old
+ LOG.debug( "local modification is latest, discarding the modDn operation dn {}", remoteEntry.getDn() );
+ return;
+ }
+ }
+
// Compute the DN, parentDn and Rdn for both entries
Dn localDn = localEntry.getDn();
Dn remoteDn = directoryService.getDnFactory().create( remoteEntry.getDn().getName() );
@@ -909,16 +910,22 @@ public class ReplicationConsumerImpl imp
{
case MOVE:
LOG.debug( "moving {} to the new parent {}", localDn, remoteParentDn );
- session.move( localDn, remoteParentDn );
-
+ MoveOperationContext movCtx = new MoveOperationContext( session, localDn, remoteParentDn );
+ movCtx.setReplEvent( true );
+ movCtx.setRid( rid );
+ directoryService.getOperationManager().move( movCtx );
+
break;
case RENAME:
LOG.debug( "renaming the Dn {} with new Rdn {} and deleteOldRdn flag set to {}", new String[]
{ localDn.getName(), remoteRdn.getName(), String.valueOf( deleteOldRdn ) } );
- session.rename( localDn, remoteRdn, deleteOldRdn );
-
+ RenameOperationContext renCtx = new RenameOperationContext( session, localDn, remoteRdn, deleteOldRdn );
+ renCtx.setReplEvent( true );
+ renCtx.setRid( rid );
+ directoryService.getOperationManager().rename( renCtx );
+
break;
case MOVE_AND_RENAME:
@@ -931,8 +938,11 @@ public class ReplicationConsumerImpl imp
remoteRdn.getName(),
String.valueOf( deleteOldRdn ) } );
- session.moveAndRename( localDn, remoteParentDn, remoteRdn, deleteOldRdn );
-
+ MoveAndRenameOperationContext movRenCtx = new MoveAndRenameOperationContext( session, localDn, remoteParentDn, remoteRdn, deleteOldRdn );
+ movRenCtx.setReplEvent( true );
+ movRenCtx.setRid( rid );
+ directoryService.getOperationManager().moveAndRename( movRenCtx );
+
break;
}
}
@@ -943,12 +953,30 @@ public class ReplicationConsumerImpl imp
}
- private void modify( Entry remoteEntry ) throws Exception
+ private void modify( Entry remoteEntry, int rid ) throws Exception
{
- Entry localEntry = session.lookup( remoteEntry.getDn() );
-
+ LookupOperationContext lookupCtx = new LookupOperationContext( session, remoteEntry.getDn() );
+ lookupCtx.setAttrsId( config.getAttributes() );
+ lookupCtx.setSyncreplLookup( true );
+
+ Entry localEntry = session.getDirectoryService().getOperationManager().lookup( lookupCtx );
+
+ if ( config.isMmrMode() )
+ {
+ Csn localCsn = new Csn( localEntry.get( SchemaConstants.ENTRY_CSN_AT ).getString() );
+ Csn remoteCsn = new Csn( remoteEntry.get( SchemaConstants.ENTRY_CSN_AT ).getString() );
+
+ if ( localCsn.compareTo( remoteCsn ) >= 0 )
+ {
+ // just discard the received modified entry, that is old
+ LOG.debug( "local modification is latest, discarding the modification of dn {}", remoteEntry.getDn() );
+ return;
+ }
+ }
+
remoteEntry.removeAttributes( MOD_IGNORE_AT );
-
+ localEntry.removeAttributes( MOD_IGNORE_AT );
+
List<Modification> mods = new ArrayList<Modification>();
Iterator<Attribute> itr = localEntry.iterator();
@@ -982,7 +1010,19 @@ public class ReplicationConsumerImpl imp
}
}
- session.modify( remoteEntry.getDn(), mods );
+ List<Modification> serverModifications = new ArrayList<Modification>( mods.size() );
+
+ for ( Modification mod : mods )
+ {
+ serverModifications.add( new DefaultModification( directoryService.getSchemaManager(), mod ) );
+ }
+
+ ModifyOperationContext modifyContext = new ModifyOperationContext( session, remoteEntry.getDn(), serverModifications );
+ modifyContext.setReplEvent( true );
+ modifyContext.setRid( rid );
+
+ OperationManager operationManager = directoryService.getOperationManager();
+ operationManager.modify( modifyContext );
}
@@ -990,9 +1030,10 @@ public class ReplicationConsumerImpl imp
* deletes the entries having the UUID given in the list
*
* @param uuidList the list of UUIDs
+ * @param replicaId TODO
* @throws Exception in case of any problems while deleting the entries
*/
- public void deleteEntries( List<byte[]> uuidList, boolean isRefreshPresent ) throws Exception
+ public void deleteEntries( List<byte[]> uuidList, boolean isRefreshPresent, int replicaId ) throws Exception
{
if ( uuidList == null || uuidList.isEmpty() )
{
@@ -1010,7 +1051,7 @@ public class ReplicationConsumerImpl imp
if ( isRefreshPresent )
{
LOG.debug( "refresh present syncinfo list has {} UUIDs", uuidList.size() );
- _deleteEntries_( uuidList, isRefreshPresent );
+ _deleteEntries_( uuidList, isRefreshPresent, replicaId );
return;
}
@@ -1023,7 +1064,7 @@ public class ReplicationConsumerImpl imp
for ( ; i < count; i++ )
{
startIndex = i * NODE_LIMIT;
- _deleteEntries_( uuidList.subList( startIndex, startIndex + NODE_LIMIT ), isRefreshPresent );
+ _deleteEntries_( uuidList.subList( startIndex, startIndex + NODE_LIMIT ), isRefreshPresent, replicaId );
}
if ( ( uuidList.size() % NODE_LIMIT ) != 0 )
@@ -1033,7 +1074,7 @@ public class ReplicationConsumerImpl imp
{
startIndex = i * NODE_LIMIT;
}
- _deleteEntries_( uuidList.subList( startIndex, uuidList.size() ), isRefreshPresent );
+ _deleteEntries_( uuidList.subList( startIndex, uuidList.size() ), isRefreshPresent, replicaId );
}
}
@@ -1043,8 +1084,9 @@ public class ReplicationConsumerImpl imp
*
* @param limitedUuidList a list of UUIDs whose size is less than or equal to #NODE_LIMIT (node limit applies only for refreshDeletes list)
* @param isRefreshPresent a flag indicating the type of entries present in the UUID list
+ * @param replicaId TODO
*/
- private void _deleteEntries_( List<byte[]> limitedUuidList, boolean isRefreshPresent ) throws Exception
+ private void _deleteEntries_( List<byte[]> limitedUuidList, boolean isRefreshPresent, int replicaId ) throws Exception
{
ExprNode filter = null;
int size = limitedUuidList.size();
@@ -1094,6 +1136,7 @@ public class ReplicationConsumerImpl imp
AliasDerefMode.NEVER_DEREF_ALIASES, ENTRY_UUID_ATOP_SET );
cursor.beforeFirst();
+ List<Dn> entryDnLst = new ArrayList<Dn>();
while ( cursor.next() )
{
Entry entry = cursor.get();
@@ -1110,11 +1153,7 @@ public class ReplicationConsumerImpl imp
{
/** A field used to tell the thread it should stop */
private volatile boolean stop = false;
-
- /** A mutex used to make the thread sleeping for a moment */
- private final Object mutex = new Object();
-
-
+
public RefresherThread()
{
setDaemon( true );
@@ -1132,7 +1171,7 @@ public class ReplicationConsumerImpl imp
doSyncSearch( SynchronizationModeEnum.REFRESH_ONLY, false );
LOG.info( "--------------------- Sleep for a little while ------------------" );
- mutex.wait( config.getRefreshInterval() );
+ Thread.sleep( config.getRefreshInterval() );
LOG.debug( "--------------------- syncing again ------------------" );
}
@@ -1151,11 +1190,22 @@ public class ReplicationConsumerImpl imp
public void stopRefreshing()
{
stop = true;
-
- // just in case if it is sleeping, wake up the thread
- mutex.notify();
}
}
+
+
+ private synchronized Object getLockFor( String uuid )
+ {
+ Object lock = uuidLockMap.get( uuid );
+
+ if( lock == null )
+ {
+ lock = new Object();
+ uuidLockMap.put( uuid, lock );
+ }
+
+ return lock;
+ }
/**
Modified: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/ReplConsumerManager.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/ReplConsumerManager.java?rev=1398782&r1=1398781&r2=1398782&view=diff
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/ReplConsumerManager.java (original)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/ReplConsumerManager.java Tue Oct 16 13:27:19 2012
@@ -41,6 +41,7 @@ import org.apache.directory.shared.ldap.
import org.apache.directory.shared.ldap.model.entry.ModificationOperation;
import org.apache.directory.shared.ldap.model.entry.StringValue;
import org.apache.directory.shared.ldap.model.exception.LdapEntryAlreadyExistsException;
+import org.apache.directory.shared.ldap.model.exception.LdapException;
import org.apache.directory.shared.ldap.model.filter.EqualityNode;
import org.apache.directory.shared.ldap.model.filter.ExprNode;
import org.apache.directory.shared.ldap.model.message.AliasDerefMode;
@@ -176,9 +177,7 @@ public class ReplConsumerManager
SchemaConstants.ADS_REPL_SEARCH_FILTER, replica.getSearchFilter() );
adminSession.add( entry );
-
- // Last, create a
-
+
LOG.debug( "stored replication consumer entry {}", consumerDn );
}
@@ -189,7 +188,7 @@ public class ReplConsumerManager
* @param replica The added consumer replica
* @throws Exception If the addition failed
*/
- public void deleteConsumerEntry( ReplicaEventLog replica ) throws Exception
+ public void deleteConsumerEntry( ReplicaEventLog replica ) throws LdapException
{
if ( replica == null )
{
@@ -201,7 +200,7 @@ public class ReplConsumerManager
Dn consumerDn = directoryService.getDnFactory().create(
SchemaConstants.ADS_DS_REPLICA_ID + "=" + replica.getId() + "," + REPL_CONSUMER_DN );
- PROVIDER_LOG.debug( "Deleting the {} consumer", consumerDn );
+ PROVIDER_LOG.debug( "Trying to delete the consumer entry {}", consumerDn );
if ( !adminSession.exists( consumerDn ) )
{
@@ -209,7 +208,7 @@ public class ReplConsumerManager
String message = "The replica " + consumerDn.getName() + " does not exist";
LOG.error( message );
PROVIDER_LOG.debug( message );
- throw new LdapEntryAlreadyExistsException( message );
+ return;
}
// Delete the consumer entry
Modified: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/ReplicaEventLog.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/ReplicaEventLog.java?rev=1398782&r1=1398781&r2=1398782&view=diff
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/ReplicaEventLog.java (original)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/ReplicaEventLog.java Tue Oct 16 13:27:19 2012
@@ -49,7 +49,7 @@ import org.slf4j.LoggerFactory;
* <li>hostname : the consumer's host</li>
* <li>searchFilter : the filter</li>
* <li>lastSentCsn : the last CSN sent by the consumer</li>
- * <li>refreshNPersist : a flag indicating that the consumer is processing in Refresh and presist mode</li>
+ * <li>refreshNPersist : a flag indicating that the consumer is processing in Refresh and persist mode</li>
* <li></li>
* </ul>
* A separate log is maintained for each syncrepl consumer.<br/>
@@ -65,7 +65,7 @@ public class ReplicaEventLog implements
/** A logger for the replication provider */
private static final Logger PROVIDER_LOG = LoggerFactory.getLogger( "PROVIDER_LOG" );
- /** IP address of the syncrepl consumer */
+ /** hostname of the syncrepl consumer */
private String hostName;
/** the unmodified search filter as it was when received from the client */
@@ -99,7 +99,8 @@ public class ReplicaEventLog implements
/** A flag used to indicate that the consumer is not up to date */
private volatile boolean dirty;
-
+ public static final String REPLICA_EVENT_LOG_NAME_PREFIX = "REPL_EVENT_LOG.";
+
/**
* Creates a new instance of EventLog for a replica
* @param replicaId The replica ID
@@ -112,16 +113,16 @@ public class ReplicaEventLog implements
this.searchCriteria = new NotificationCriteria();
this.searchCriteria.setEventMask( EventType.ALL_EVENT_TYPES_MASK );
- // Create the journal file, or open it of it exists
- File logDir = directoryService.getInstanceLayout().getLogDirectory();
- journalFile = new File( logDir, "journalRepl." + replicaId );
+ // Create the journal file, or open if already exists
+ File replDir = directoryService.getInstanceLayout().getReplDirectory();
+ journalFile = new File( replDir, REPLICA_EVENT_LOG_NAME_PREFIX + replicaId );
recman = new BaseRecordManager( journalFile.getAbsolutePath() );
SerializableComparator<String> comparator = new SerializableComparator<String>(
SchemaConstants.CSN_ORDERING_MATCH_MR_OID );
comparator.setSchemaManager( schemaManager );
- journal = new JdbmTable<String, ReplicaEventMessage>( schemaManager, "replication", recman, comparator,
+ journal = new JdbmTable<String, ReplicaEventMessage>( schemaManager, journalFile.getName(), recman, comparator,
StringSerializer.INSTANCE, new ReplicaEventMessageSerializer( schemaManager ) );
}
@@ -131,7 +132,7 @@ public class ReplicaEventLog implements
*
* @param message The message to store
*/
- public void log( ReplicaEventMessage message )
+ public synchronized void log( ReplicaEventMessage message )
{
try
{
@@ -325,7 +326,7 @@ public class ReplicaEventLog implements
/**
* Update the last Sent CSN. If it's different from the present one, we
- * will set the dirty flag to true, and a replication will follow.
+ * will set the dirty flag to true, and it will be stored in DIT.
*
* @param lastSentCsn The new Sent CSN
*/
@@ -418,7 +419,32 @@ public class ReplicaEventLog implements
return new ReplicaJournalCursor( journal, consumerCsn );
}
+
+ /**
+ * @return the name of this replica log
+ */
+ public String getName()
+ {
+ return journal.getName();
+ }
+
+
+ /**
+ * @return the number of entries present in the replica log
+ */
+ public synchronized int count()
+ {
+ try
+ {
+ return journal.count();
+ }
+ catch( IOException e )
+ {
+ throw new RuntimeException( e );
+ }
+ }
+
/**
* {@inheritDoc}
*/
Modified: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/ReplicaJournalCursor.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/ReplicaJournalCursor.java?rev=1398782&r1=1398781&r2=1398782&view=diff
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/ReplicaJournalCursor.java (original)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/ReplicaJournalCursor.java Tue Oct 16 13:27:19 2012
@@ -21,10 +21,12 @@
package org.apache.directory.server.ldap.replication.provider;
+import java.io.IOException;
import java.util.Iterator;
import org.apache.directory.server.core.partition.impl.btree.jdbm.JdbmTable;
import org.apache.directory.server.ldap.replication.ReplicaEventMessage;
+import org.apache.directory.shared.ldap.model.constants.SchemaConstants;
import org.apache.directory.shared.ldap.model.cursor.AbstractCursor;
import org.apache.directory.shared.ldap.model.cursor.Cursor;
import org.apache.directory.shared.ldap.model.cursor.Tuple;
@@ -60,6 +62,9 @@ public class ReplicaJournalCursor extend
private ReplicaEventMessage qualifiedEvtMsg;
+ /** used while cleaning up the log */
+ private boolean skipQualifying;
+
/**
* Creates a cursor on top of the given journal
@@ -71,9 +76,9 @@ public class ReplicaJournalCursor extend
{
if ( IS_DEBUG )
{
- LOG_CURSOR.debug( "Creating ReplicaJournalCursor {}", this );
+ LOG_CURSOR.debug( "Creating ReplicaJournalCursor {}", this );
}
-
+
this.journal = journal;
this.tupleCursor = journal.cursor();
this.consumerCsn = consumerCsn;
@@ -147,34 +152,30 @@ public class ReplicaJournalCursor extend
*
* @throws Exception
*/
- private void selectQualified() throws Exception
+ private boolean isQualified( String csn, ReplicaEventMessage evtMsg ) throws Exception
{
- Tuple<String, ReplicaEventMessage> t = tupleCursor.get();
-
- qualifiedEvtMsg = t.getValue();
+ LOG.debug( "ReplicaEventMessage: {}", evtMsg );
- LOG.debug( "ReplicaEventMessage: {}", qualifiedEvtMsg );
-
- if ( qualifiedEvtMsg.isEventOlderThan( consumerCsn ) )
+ if ( evtMsg.isEventOlderThan( consumerCsn ) )
{
if ( LOG.isDebugEnabled() )
{
String evt = "MODDN"; // take this as default cause the event type for MODDN is null
- ChangeType changeType = qualifiedEvtMsg.getChangeType();
+ ChangeType changeType = evtMsg.getChangeType();
if ( changeType != null )
{
evt = changeType.name();
}
- LOG.debug( "event {} for dn {} is not qualified for sending", evt, qualifiedEvtMsg.getEntry().getDn() );
+ LOG.debug( "event {} for dn {} is not qualified for sending", evt, evtMsg.getEntry().getDn() );
}
- // TODO need to be checked if this causes issues in JDBM
- journal.remove( t.getKey() );
- qualifiedEvtMsg = null;
+ return false;
}
+
+ return true;
}
@@ -194,12 +195,28 @@ public class ReplicaJournalCursor extend
{
while ( tupleCursor.next() )
{
- selectQualified();
+ Tuple<String, ReplicaEventMessage> tuple = tupleCursor.get();
- if ( qualifiedEvtMsg != null )
+ String csn = tuple.getKey();
+ ReplicaEventMessage message = tuple.getValue();
+
+ if ( skipQualifying )
+ {
+ qualifiedEvtMsg = message;
+ return true;
+ }
+
+ boolean qualified = isQualified( csn, message );
+
+ if ( qualified )
{
+ qualifiedEvtMsg = message;
return true;
}
+ else
+ {
+ journal.remove( csn );
+ }
}
qualifiedEvtMsg = null;
@@ -225,9 +242,9 @@ public class ReplicaJournalCursor extend
{
if ( IS_DEBUG )
{
- LOG_CURSOR.debug( "Closing ReplicaJournalCursor {}", this );
+ LOG_CURSOR.debug( "Closing ReplicaJournalCursor {}", this );
}
-
+
tupleCursor.close();
super.close();
}
@@ -241,15 +258,45 @@ public class ReplicaJournalCursor extend
{
if ( IS_DEBUG )
{
- LOG_CURSOR.debug( "Closing ReplicaJournalCursor {}", this );
+ LOG_CURSOR.debug( "Closing ReplicaJournalCursor {}", this );
}
-
+
tupleCursor.close();
super.close( cause );
}
/**
+ * sets the flag to skip CSN based checking while traversing
+ * used for internal log cleanup ONLY
+ */
+ protected void skipQualifyingWhileFetching()
+ {
+ skipQualifying = true;
+ }
+
+
+ /**
+ * delete the current message
+ * used for internal log cleanup ONLY
+ */
+ protected void delete()
+ {
+ try
+ {
+ if ( qualifiedEvtMsg != null )
+ {
+ journal.remove( qualifiedEvtMsg.getEntry().get( SchemaConstants.ENTRY_CSN_AT ).getString() );
+ }
+ }
+ catch ( Exception e )
+ {
+
+ }
+ }
+
+
+ /**
* {@inheritDoc}
*/
@Override