You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by ka...@apache.org on 2012/10/16 15:27:20 UTC

svn commit: r1398782 [1/2] - in /directory/apacheds/trunk: core-api/src/main/java/org/apache/directory/server/core/api/event/ core-api/src/main/java/org/apache/directory/server/core/api/interceptor/context/ core-jndi/src/main/java/org/apache/directory/...

Author: kayyagari
Date: Tue Oct 16 13:27:19 2012
New Revision: 1398782

URL: http://svn.apache.org/viewvc?rev=1398782&view=rev
Log:
first drop of changes for supporting MMR

Modified:
    directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/event/DirectoryListener.java
    directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/event/EventType.java
    directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/interceptor/context/AbstractChangeOperationContext.java
    directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/interceptor/context/LookupOperationContext.java
    directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/interceptor/context/SearchOperationContext.java
    directory/apacheds/trunk/core-jndi/src/main/java/org/apache/directory/server/core/jndi/EventListenerAdapter.java
    directory/apacheds/trunk/interceptor-kerberos/src/main/java/org/apache/directory/server/core/kerberos/KeyDerivationInterceptor.java
    directory/apacheds/trunk/interceptors/authn/src/main/java/org/apache/directory/server/core/authn/AuthenticationInterceptor.java
    directory/apacheds/trunk/interceptors/collective/src/main/java/org/apache/directory/server/core/collective/CollectiveAttributeInterceptor.java
    directory/apacheds/trunk/interceptors/event/src/main/java/org/apache/directory/server/core/event/EventInterceptor.java
    directory/apacheds/trunk/interceptors/operational/src/main/java/org/apache/directory/server/core/operational/OperationalAttributeInterceptor.java
    directory/apacheds/trunk/interceptors/subtree/src/main/java/org/apache/directory/server/core/subtree/SubentryInterceptor.java
    directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/LdapProtocolUtils.java
    directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/handlers/PersistentSearchListener.java
    directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/handlers/SearchAbandonListener.java
    directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventMessage.java
    directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncreplConfiguration.java
    directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/consumer/ReplicationConsumerImpl.java
    directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/ReplConsumerManager.java
    directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/ReplicaEventLog.java
    directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/ReplicaJournalCursor.java
    directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/SyncReplRequestHandler.java
    directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/SyncReplSearchListener.java

Modified: directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/event/DirectoryListener.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/event/DirectoryListener.java?rev=1398782&r1=1398781&r2=1398782&view=diff
==============================================================================
--- directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/event/DirectoryListener.java (original)
+++ directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/event/DirectoryListener.java Tue Oct 16 13:27:19 2012
@@ -81,4 +81,12 @@ public interface DirectoryListener
      * @param moveAndRenameContext the move/rename operation context responsible for the change
      */
     void entryMovedAndRenamed( MoveAndRenameOperationContext moveAndRenameContext );
+    
+    
+    /**
+     * indicates if this listener needs to be invoked synchronously
+     *  
+     * @return true if should be invoked synchronously, false otherwise
+     */
+    boolean isSynchronous();
 }

Modified: directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/event/EventType.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/event/EventType.java?rev=1398782&r1=1398781&r2=1398782&view=diff
==============================================================================
--- directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/event/EventType.java (original)
+++ directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/event/EventType.java Tue Oct 16 13:27:19 2012
@@ -35,12 +35,13 @@ import org.apache.directory.shared.ldap.
  */
 public enum EventType
 {
-    ADD(1),
-    DELETE(2),
-    MODIFY(4),
-    RENAME(8),
-    MOVE(16);
-
+    ADD(1), 
+    DELETE(2), 
+    MODIFY(4), 
+    RENAME(8), 
+    MOVE(16),
+    MOVE_AND_RENAME(24);    
+    
     public static final int ALL_EVENT_TYPES_MASK = getAllEventTypesMask();
     public static final int MOVE_AND_RENAME_MASK = MOVE.mask | RENAME.mask;
     private static final EventType[] EMPTY_EVENT_ARRAY = new EventType[0];

Modified: directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/interceptor/context/AbstractChangeOperationContext.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/interceptor/context/AbstractChangeOperationContext.java?rev=1398782&r1=1398781&r2=1398782&view=diff
==============================================================================
--- directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/interceptor/context/AbstractChangeOperationContext.java (original)
+++ directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/interceptor/context/AbstractChangeOperationContext.java Tue Oct 16 13:27:19 2012
@@ -43,7 +43,22 @@ public abstract class AbstractChangeOper
     /** The modified Entry as it will be stored into the backend */
     protected Entry modifiedEntry;
 
-
+    /** flag to indicate if this context is carrying a replicated entry */
+    private boolean replEvent;
+    
+    /** the rid present in the cookie received from a replication consumer */
+    private int rid = -1; // default value, an invalid rid
+    
+    /** a flag to indicate when we don't want a replication event to be generated after this operation */
+    private boolean generateNoReplEvt;
+    
+    /** 
+     * flag to tell if this context needs to be sent to the event interceptor manually
+     * This is used only internally where certain modifications do not go through event
+     * interceptor.  
+     */
+    private boolean pushToEvtIntrcptor;
+    
     /**
      * 
      * Creates a new instance of AbstractChangeOperationContext.
@@ -117,4 +132,83 @@ public abstract class AbstractChangeOper
     {
         return logChange != LogChange.FALSE;
     }
+
+
+    /**
+     * @return true if this context is containing a replication event
+     */
+    public boolean isReplEvent()
+    {
+        return replEvent;
+    }
+
+
+    /**
+     * @param replEvent mark the context as containing a replication event
+     */
+    public void setReplEvent( boolean replEvent )
+    {
+        this.replEvent = replEvent;
+    }
+
+
+    /**
+     * @return the replica ID received from a consumer
+     */
+    public int getRid()
+    {
+        return rid;
+    }
+
+
+    /**
+     * sets the replica ID received from a consumer
+     * @param rid 
+     */
+    public void setRid( int rid )
+    {
+        this.rid = rid;
+    }
+
+
+    /**
+     * @return true if a replication event shouldn't be generated for the changes
+     *         done using this operation context, false otherwise
+     */
+    public boolean isGenerateNoReplEvt()
+    {
+        return generateNoReplEvt;
+    }
+
+
+    /**
+     * sets whether or not to generate replication event messages by after an operation
+     * using this operation context completes
+     * 
+     * @param generateNoReplEvt
+     */
+    public void setGenerateNoReplEvt( boolean generateNoReplEvt )
+    {
+        this.generateNoReplEvt = generateNoReplEvt;
+    }
+
+
+    /**
+     * @return true if this context needs to be pushed to the event interceptor from nexus
+     */
+    public boolean isPushToEvtIntrcptor()
+    {
+        return pushToEvtIntrcptor;
+    }
+
+
+    /**
+     * sets if this context needs to be pushed to the event interceptor from nexus
+     * 
+     * @param pushToEvtIntrcptor
+     */
+    public void setPushToEvtIntrcptor( boolean pushToEvtIntrcptor )
+    {
+        this.pushToEvtIntrcptor = pushToEvtIntrcptor;
+    }
 }

Modified: directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/interceptor/context/LookupOperationContext.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/interceptor/context/LookupOperationContext.java?rev=1398782&r1=1398781&r2=1398782&view=diff
==============================================================================
--- directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/interceptor/context/LookupOperationContext.java (original)
+++ directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/interceptor/context/LookupOperationContext.java Tue Oct 16 13:27:19 2012
@@ -54,6 +54,8 @@ public class LookupOperationContext exte
     /** A flag set to true if the user has requested no attribute to be returned */
     private Boolean noAttribute;
 
+    /** flag to indicate if this search is done for replication */
+    private boolean syncreplLookup;
 
     /**
      * 
@@ -284,8 +286,28 @@ public class LookupOperationContext exte
         return "Lookup";
     }
 
+    
+    /**
+     * @return true if this is a syncrepl specific search
+     */
+    public boolean isSyncreplLookup()
+    {
+        return syncreplLookup;
+    }
+
 
     /**
+     * sets the flag to indicate if this is a synrepl specific search or not
+     * 
+     * @param syncreplLookup
+     */
+    public void setSyncreplLookup( boolean syncreplLookup )
+    {
+        this.syncreplLookup = syncreplLookup;
+    }
+    
+    
+    /**
      * @see Object#toString()
      */
     public String toString()

Modified: directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/interceptor/context/SearchOperationContext.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/interceptor/context/SearchOperationContext.java?rev=1398782&r1=1398781&r2=1398782&view=diff
==============================================================================
--- directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/interceptor/context/SearchOperationContext.java (original)
+++ directory/apacheds/trunk/core-api/src/main/java/org/apache/directory/server/core/api/interceptor/context/SearchOperationContext.java Tue Oct 16 13:27:19 2012
@@ -52,6 +52,9 @@ public class SearchOperationContext exte
     private ExprNode filter;
 
 
+    /** flag to indicate if this search is done for replication */
+    private boolean syncreplSearch;
+    
     /**
      * Creates a new instance of SearchOperationContext.
      */
@@ -213,4 +216,24 @@ public class SearchOperationContext exte
     {
         return MessageTypeEnum.SEARCH_REQUEST.name();
     }
+
+
+    /**
+     * @return true if this is a syncrepl specific search
+     */
+    public boolean isSyncreplSearch()
+    {
+        return syncreplSearch;
+    }
+
+
+    /**
+     * sets the flag to indicate if this is a synrepl specific search or not
+     * 
+     * @param syncreplSearch
+     */
+    public void setSyncreplSearch( boolean syncreplSearch )
+    {
+        this.syncreplSearch = syncreplSearch;
+    }
 }

Modified: directory/apacheds/trunk/core-jndi/src/main/java/org/apache/directory/server/core/jndi/EventListenerAdapter.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/core-jndi/src/main/java/org/apache/directory/server/core/jndi/EventListenerAdapter.java?rev=1398782&r1=1398781&r2=1398782&view=diff
==============================================================================
--- directory/apacheds/trunk/core-jndi/src/main/java/org/apache/directory/server/core/jndi/EventListenerAdapter.java (original)
+++ directory/apacheds/trunk/core-jndi/src/main/java/org/apache/directory/server/core/jndi/EventListenerAdapter.java Tue Oct 16 13:27:19 2012
@@ -80,6 +80,13 @@ public class EventListenerAdapter implem
         this.source = source;
         this.listener = listener;
     }
+    
+    
+    @Override
+	public boolean isSynchronous()
+    {
+		return false; // always asynchronous
+	}
 
 
     private void deliverNamingExceptionEvent( Exception e )

Modified: directory/apacheds/trunk/interceptor-kerberos/src/main/java/org/apache/directory/server/core/kerberos/KeyDerivationInterceptor.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/interceptor-kerberos/src/main/java/org/apache/directory/server/core/kerberos/KeyDerivationInterceptor.java?rev=1398782&r1=1398781&r2=1398782&view=diff
==============================================================================
--- directory/apacheds/trunk/interceptor-kerberos/src/main/java/org/apache/directory/server/core/kerberos/KeyDerivationInterceptor.java (original)
+++ directory/apacheds/trunk/interceptor-kerberos/src/main/java/org/apache/directory/server/core/kerberos/KeyDerivationInterceptor.java Tue Oct 16 13:27:19 2012
@@ -92,6 +92,12 @@ public class KeyDerivationInterceptor ex
      */
     public void add( AddOperationContext addContext ) throws LdapException
     {
+        if ( addContext.isReplEvent() )
+        {
+            next( addContext );
+            return;
+        }
+        
         Dn normName = addContext.getDn();
 
         Entry entry = addContext.getEntry();
@@ -147,6 +153,12 @@ public class KeyDerivationInterceptor ex
      */
     public void modify( ModifyOperationContext modContext ) throws LdapException
     {
+        if( modContext.isReplEvent() )
+        {
+            next( modContext );
+            return;
+        }
+        
         ModifySubContext subContext = new ModifySubContext();
 
         detectPasswordModification( modContext, subContext );

Modified: directory/apacheds/trunk/interceptors/authn/src/main/java/org/apache/directory/server/core/authn/AuthenticationInterceptor.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/interceptors/authn/src/main/java/org/apache/directory/server/core/authn/AuthenticationInterceptor.java?rev=1398782&r1=1398781&r2=1398782&view=diff
==============================================================================
--- directory/apacheds/trunk/interceptors/authn/src/main/java/org/apache/directory/server/core/authn/AuthenticationInterceptor.java (original)
+++ directory/apacheds/trunk/interceptors/authn/src/main/java/org/apache/directory/server/core/authn/AuthenticationInterceptor.java Tue Oct 16 13:27:19 2012
@@ -313,7 +313,8 @@ public class AuthenticationInterceptor e
 
         Entry entry = addContext.getEntry();
 
-        if ( !directoryService.isPwdPolicyEnabled() )
+
+        if ( !directoryService.isPwdPolicyEnabled() || addContext.isReplEvent() )
         {
             next( addContext );
             return;
@@ -594,7 +595,8 @@ public class AuthenticationInterceptor e
                     bindModCtx.setDn( dn );
                     bindModCtx.setEntry( userEntry );
                     bindModCtx.setModItems( mods );
-                    
+                    bindModCtx.setPushToEvtIntrcptor( true );
+
                     directoryService.getPartitionNexus().modify( bindModCtx );
                 }
             }
@@ -674,7 +676,8 @@ public class AuthenticationInterceptor e
                 bindModCtx.setDn( dn );
                 bindModCtx.setEntry( userEntry );
                 bindModCtx.setModItems( mods );
-                
+                bindModCtx.setPushToEvtIntrcptor( true );
+
                 directoryService.getPartitionNexus().modify( bindModCtx );
             }
 
@@ -830,7 +833,8 @@ public class AuthenticationInterceptor e
 
         checkAuthenticated( modifyContext );
 
-        if ( !directoryService.isPwdPolicyEnabled() )
+
+        if ( ! directoryService.isPwdPolicyEnabled() || modifyContext.isReplEvent() )
         {
             next( modifyContext );
             invalidateAuthenticatorCaches( modifyContext.getDn() );

Modified: directory/apacheds/trunk/interceptors/collective/src/main/java/org/apache/directory/server/core/collective/CollectiveAttributeInterceptor.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/interceptors/collective/src/main/java/org/apache/directory/server/core/collective/CollectiveAttributeInterceptor.java?rev=1398782&r1=1398781&r2=1398782&view=diff
==============================================================================
--- directory/apacheds/trunk/interceptors/collective/src/main/java/org/apache/directory/server/core/collective/CollectiveAttributeInterceptor.java (original)
+++ directory/apacheds/trunk/interceptors/collective/src/main/java/org/apache/directory/server/core/collective/CollectiveAttributeInterceptor.java Tue Oct 16 13:27:19 2012
@@ -146,6 +146,12 @@ public class CollectiveAttributeIntercep
     {
         Entry result = next( lookupContext );
 
+        // do not add collective attributes
+        if( lookupContext.isSyncreplLookup() )
+        {
+            return result;
+        }
+        
         // Adding the collective attributes if any
         if ( ( lookupContext.getAttrsId() == null ) || ( lookupContext.getAttrsId().size() == 0 ) )
         {
@@ -178,7 +184,11 @@ public class CollectiveAttributeIntercep
     {
         EntryFilteringCursor cursor = next( searchContext );
 
-        cursor.addEntryFilter( SEARCH_FILTER );
+        // only add collective attributes for non-syncrepl search
+        if( !searchContext.isSyncreplSearch() )
+        {
+            cursor.addEntryFilter( SEARCH_FILTER );
+        }
 
         return cursor;
     }

Modified: directory/apacheds/trunk/interceptors/event/src/main/java/org/apache/directory/server/core/event/EventInterceptor.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/interceptors/event/src/main/java/org/apache/directory/server/core/event/EventInterceptor.java?rev=1398782&r1=1398781&r2=1398782&view=diff
==============================================================================
--- directory/apacheds/trunk/interceptors/event/src/main/java/org/apache/directory/server/core/event/EventInterceptor.java (original)
+++ directory/apacheds/trunk/interceptors/event/src/main/java/org/apache/directory/server/core/event/EventInterceptor.java Tue Oct 16 13:27:19 2012
@@ -107,58 +107,93 @@ public class EventInterceptor extends Ba
         switch ( type )
         {
             case ADD:
-                executor.execute( new Runnable()
+                if ( listener.isSynchronous() )
                 {
-                    public void run()
+                    listener.entryAdded( ( AddOperationContext ) opContext );
+                }
+                else
+                {
+                    executor.execute( new Runnable()
                     {
-                        listener.entryAdded( ( AddOperationContext ) opContext );
-                    }
-                } );
-
+                        public void run()
+                        {
+                            listener.entryAdded( ( AddOperationContext ) opContext );
+                        }
+                    } );
+                }
+                
                 break;
-
+                
             case DELETE:
-                executor.execute( new Runnable()
+                if ( listener.isSynchronous() )
                 {
-                    public void run()
+                    listener.entryDeleted( ( DeleteOperationContext ) opContext );
+                }
+                else
+                {
+                    executor.execute( new Runnable()
                     {
-                        listener.entryDeleted( ( DeleteOperationContext ) opContext );
-                    }
-                } );
-
+                        public void run()
+                        {
+                            listener.entryDeleted( ( DeleteOperationContext ) opContext );
+                        }
+                    } );
+                }
+                
                 break;
-
+                
             case MODIFY:
-                executor.execute( new Runnable()
+                if ( listener.isSynchronous() )
                 {
-                    public void run()
+                    listener.entryModified( ( ModifyOperationContext ) opContext );
+                }
+                else
+                {
+                    executor.execute( new Runnable()
                     {
-                        listener.entryModified( ( ModifyOperationContext ) opContext );
-                    }
-                } );
-
+                        public void run()
+                        {
+                            listener.entryModified( ( ModifyOperationContext ) opContext );
+                        }
+                    } );
+                }
+                
                 break;
-
+                
             case MOVE:
-                executor.execute( new Runnable()
+                if ( listener.isSynchronous() )
                 {
-                    public void run()
+                    listener.entryMoved( ( MoveOperationContext ) opContext );                    
+                }
+                else
+                {
+                    executor.execute( new Runnable()
                     {
-                        listener.entryMoved( ( MoveOperationContext ) opContext );
-                    }
-                } );
-
+                        public void run()
+                        {
+                            listener.entryMoved( ( MoveOperationContext ) opContext );
+                        }
+                    } );
+                }
+                
                 break;
 
             case RENAME:
-                executor.execute( new Runnable()
+                if ( listener.isSynchronous() )
                 {
-                    public void run()
+                    listener.entryRenamed( ( RenameOperationContext ) opContext );
+                }
+                else
+                {
+                    executor.execute( new Runnable()
                     {
-                        listener.entryRenamed( ( RenameOperationContext ) opContext );
-                    }
-                } );
-
+                        public void run()
+                        {
+                            listener.entryRenamed( ( RenameOperationContext ) opContext );
+                        }
+                    } );
+                }
+                
                 break;
         }
     }
@@ -220,7 +255,11 @@ public class EventInterceptor extends Ba
 
         List<RegistrationEntry> selecting = getSelectingRegistrations( modifyContext.getDn(), oriEntry );
 
-        next( modifyContext );
+        // modification was already done when this flag is turned on, move to sending the events
+        if( !modifyContext.isPushToEvtIntrcptor() )
+        {
+            next( modifyContext );
+        }
 
         if ( selecting.isEmpty() )
         {

Modified: directory/apacheds/trunk/interceptors/operational/src/main/java/org/apache/directory/server/core/operational/OperationalAttributeInterceptor.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/interceptors/operational/src/main/java/org/apache/directory/server/core/operational/OperationalAttributeInterceptor.java?rev=1398782&r1=1398781&r2=1398782&view=diff
==============================================================================
--- directory/apacheds/trunk/interceptors/operational/src/main/java/org/apache/directory/server/core/operational/OperationalAttributeInterceptor.java (original)
+++ directory/apacheds/trunk/interceptors/operational/src/main/java/org/apache/directory/server/core/operational/OperationalAttributeInterceptor.java Tue Oct 16 13:27:19 2012
@@ -36,6 +36,7 @@ import org.apache.directory.server.core.
 import org.apache.directory.server.core.api.interceptor.BaseInterceptor;
 import org.apache.directory.server.core.api.interceptor.Interceptor;
 import org.apache.directory.server.core.api.interceptor.context.AddOperationContext;
+import org.apache.directory.server.core.api.interceptor.context.DeleteOperationContext;
 import org.apache.directory.server.core.api.interceptor.context.ListOperationContext;
 import org.apache.directory.server.core.api.interceptor.context.LookupOperationContext;
 import org.apache.directory.server.core.api.interceptor.context.ModifyOperationContext;
@@ -440,6 +441,10 @@ public class OperationalAttributeInterce
         Entry modifiedEntry = moveContext.getOriginalEntry().clone();
         modifiedEntry.put( SchemaConstants.MODIFIERS_NAME_AT, getPrincipal( moveContext ).getName() );
         modifiedEntry.put( SchemaConstants.MODIFY_TIMESTAMP_AT, DateUtils.getGeneralizedTime() );
+        
+        Attribute csnAt = new DefaultAttribute( ENTRY_CSN_AT, directoryService.getCSN().toString() );
+        modifiedEntry.put( csnAt );
+
         modifiedEntry.setDn( moveContext.getNewDn() );
         moveContext.setModifiedEntry( modifiedEntry );
 
@@ -456,6 +461,10 @@ public class OperationalAttributeInterce
         modifiedEntry.put( SchemaConstants.MODIFIERS_NAME_AT, getPrincipal( moveAndRenameContext ).getName() );
         modifiedEntry.put( SchemaConstants.MODIFY_TIMESTAMP_AT, DateUtils.getGeneralizedTime() );
         modifiedEntry.setDn( moveAndRenameContext.getNewDn() );
+        
+        Attribute csnAt = new DefaultAttribute( ENTRY_CSN_AT, directoryService.getCSN().toString() );
+        modifiedEntry.put( csnAt );
+
         moveAndRenameContext.setModifiedEntry( modifiedEntry );
 
         next( moveAndRenameContext );
@@ -474,6 +483,10 @@ public class OperationalAttributeInterce
         Entry modifiedEntry = renameContext.getOriginalEntry().clone();
         modifiedEntry.put( SchemaConstants.MODIFIERS_NAME_AT, getPrincipal( renameContext ).getName() );
         modifiedEntry.put( SchemaConstants.MODIFY_TIMESTAMP_AT, DateUtils.getGeneralizedTime() );
+        
+        Attribute csnAt = new DefaultAttribute( ENTRY_CSN_AT, directoryService.getCSN().toString() );
+        modifiedEntry.put( csnAt );
+
         renameContext.setModifiedEntry( modifiedEntry );
 
         next( renameContext );
@@ -498,12 +511,22 @@ public class OperationalAttributeInterce
             return cursor;
         }
 
-        cursor.addEntryFilter( SEARCH_FILTER );
-
         return cursor;
     }
 
 
+    @Override
+    public void delete( DeleteOperationContext deleteContext ) throws LdapException
+    {
+        // insert a new CSN into the entry, this is for replication
+        Entry entry = deleteContext.getEntry();        
+        Attribute csnAt = new DefaultAttribute( ENTRY_CSN_AT, directoryService.getCSN().toString() );
+        entry.put( csnAt );
+
+        next( deleteContext );
+    }
+
+
     /**
      * Filters out the operational attributes within a search results attributes.  The attributes are directly
      * modified.

Modified: directory/apacheds/trunk/interceptors/subtree/src/main/java/org/apache/directory/server/core/subtree/SubentryInterceptor.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/interceptors/subtree/src/main/java/org/apache/directory/server/core/subtree/SubentryInterceptor.java?rev=1398782&r1=1398781&r2=1398782&view=diff
==============================================================================
--- directory/apacheds/trunk/interceptors/subtree/src/main/java/org/apache/directory/server/core/subtree/SubentryInterceptor.java (original)
+++ directory/apacheds/trunk/interceptors/subtree/src/main/java/org/apache/directory/server/core/subtree/SubentryInterceptor.java Tue Oct 16 13:27:19 2012
@@ -1567,6 +1567,12 @@ public class SubentryInterceptor extends
             return cursor;
         }
 
+        // DO NOT hide subentries for replication operations
+        if( searchContext.isSyncreplSearch() )
+        {
+            return cursor;
+        }
+        
         // for subtree and one level scope we filter
         if ( !isSubentryVisible( searchContext ) )
         {

Modified: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/LdapProtocolUtils.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/LdapProtocolUtils.java?rev=1398782&r1=1398781&r2=1398782&view=diff
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/LdapProtocolUtils.java (original)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/LdapProtocolUtils.java Tue Oct 16 13:27:19 2012
@@ -43,8 +43,8 @@ public class LdapProtocolUtils
     /** the prefix for replicaId value */
     public static final String REPLICA_ID_PREFIX = "rid=";
 
-    private static final int REPLICA_ID_PREFIX_LEN = REPLICA_ID_PREFIX.length();
-
+    public static final int REPLICA_ID_PREFIX_LEN = REPLICA_ID_PREFIX.length();
+    
     /** the prefix for Csn value */
     public static final String CSN_PREFIX = "csn=";
 

Modified: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/handlers/PersistentSearchListener.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/handlers/PersistentSearchListener.java?rev=1398782&r1=1398781&r2=1398782&view=diff
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/handlers/PersistentSearchListener.java (original)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/handlers/PersistentSearchListener.java Tue Oct 16 13:27:19 2012
@@ -72,6 +72,13 @@ public class PersistentSearchListener im
         this.psearchControl = ( PersistentSearch ) req.getControls().get( PersistentSearch.OID );
     }
 
+    
+    @Override
+	public boolean isSynchronous()
+    {
+		return false; // always asynchronous
+	}
+
 
     public void abandon() throws LdapException
     {

Modified: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/handlers/SearchAbandonListener.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/handlers/SearchAbandonListener.java?rev=1398782&r1=1398781&r2=1398782&view=diff
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/handlers/SearchAbandonListener.java (original)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/handlers/SearchAbandonListener.java Tue Oct 16 13:27:19 2012
@@ -79,7 +79,7 @@ public class SearchAbandonListener imple
 
         try
         {
-            if ( cursor != null )
+            if ( ( cursor != null  ) && !cursor.isClosed() )
             {
                 /*
                  * When this method is called due to an abandon request it 

Modified: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventMessage.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventMessage.java?rev=1398782&r1=1398781&r2=1398782&view=diff
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventMessage.java (original)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventMessage.java Tue Oct 16 13:27:19 2012
@@ -35,11 +35,10 @@ public class ReplicaEventMessage
 {
     /** The message change type */
     private ChangeType changeType;
-
+    
     /** The entry */
     private Entry entry;
 
-
     /**
      * Create a new ReplicaEvent instance for a Add/Delete+Modify operation
      * @param changeType The change type
@@ -79,10 +78,15 @@ public class ReplicaEventMessage
      */
     public boolean isEventOlderThan( String csn ) throws Exception
     {
+    	if( csn == null )
+    	{
+    		return false;
+    	}
+    	
         String entryCsn = entry.get( SchemaConstants.ENTRY_CSN_AT ).getString();
-
+        
         int i = entryCsn.compareTo( csn );
-
-        return ( i < 0 );
+        
+        return ( i <= 0 );
     }
 }

Modified: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncreplConfiguration.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncreplConfiguration.java?rev=1398782&r1=1398781&r2=1398782&view=diff
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncreplConfiguration.java (original)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncreplConfiguration.java Tue Oct 16 13:27:19 2012
@@ -126,6 +126,8 @@ public class SyncreplConfiguration imple
     /** the X509 certificate trust manager used, default value set to {@link NoVerificationTrustManager} */
     private X509TrustManager trustManager = new NoVerificationTrustManager();
 
+    /** flag to indicate if this node is part of a MMR setup, default value is true */
+    private boolean mmrMode = true;
 
     /**
      * Creates a new instance of SyncreplConfiguration
@@ -134,9 +136,7 @@ public class SyncreplConfiguration imple
     {
         attributes = new HashSet<String>();
         attributes.add( SchemaConstants.ALL_USER_ATTRIBUTES );
-        attributes.add( Strings.toLowerCase( SchemaConstants.ENTRY_UUID_AT ) );
-        attributes.add( Strings.toLowerCase( SchemaConstants.ENTRY_CSN_AT ) );
-        attributes.add( Strings.toLowerCase( SchemaConstants.REF_AT ) );
+        attributes.add( Strings.toLowerCase( SchemaConstants.ALL_OPERATIONAL_ATTRIBUTES ) );
     }
 
 
@@ -452,17 +452,6 @@ public class SyncreplConfiguration imple
     }
 
 
-    /** a flag to indicate to store the cookie in a file, default is false
-     *  NOTE: a value of true indicates that the cookie will be stored
-     *  on file system, which is useful while testing the consumer
-     *  without loading config partition
-     */
-    public boolean isStoreCookieInFile()
-    {
-        return ( configEntryDn == null );
-    }
-
-
     /**
      * Tells if we chase referrals
      * @return true if we chase referals
@@ -561,4 +550,23 @@ public class SyncreplConfiguration imple
     {
         this.configEntryDn = configEntryDn;
     }
+
+    /**
+     * @return true if this node is part of MMR setup
+     */
+    public boolean isMmrMode()
+    {
+        return mmrMode;
+    }
+
+
+    /**
+     * enable/disable MMR option
+     *
+     * @param mmrMode
+     */
+    public void setMmrMode( boolean mmrMode )
+    {
+        this.mmrMode = mmrMode;
+    }
 }

Modified: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/consumer/ReplicationConsumerImpl.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/consumer/ReplicationConsumerImpl.java?rev=1398782&r1=1398781&r2=1398782&view=diff
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/consumer/ReplicationConsumerImpl.java (original)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/consumer/ReplicationConsumerImpl.java Tue Oct 16 13:27:19 2012
@@ -20,10 +20,6 @@
 package org.apache.directory.server.ldap.replication.consumer;
 
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -33,13 +29,21 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.commons.collections.map.LRUMap;
 import org.apache.directory.ldap.client.api.ConnectionClosedEventListener;
 import org.apache.directory.ldap.client.api.LdapNetworkConnection;
 import org.apache.directory.ldap.client.api.future.SearchFuture;
 import org.apache.directory.server.core.api.CoreSession;
 import org.apache.directory.server.core.api.DirectoryService;
+import org.apache.directory.server.core.api.OperationManager;
 import org.apache.directory.server.core.api.filtering.EntryFilteringCursor;
-import org.apache.directory.server.i18n.I18n;
+import org.apache.directory.server.core.api.interceptor.context.AddOperationContext;
+import org.apache.directory.server.core.api.interceptor.context.LookupOperationContext;
+import org.apache.directory.server.core.api.interceptor.context.ModifyOperationContext;
+import org.apache.directory.server.core.api.interceptor.context.MoveAndRenameOperationContext;
+import org.apache.directory.server.core.api.interceptor.context.MoveOperationContext;
+import org.apache.directory.server.core.api.interceptor.context.RenameOperationContext;
+import org.apache.directory.server.ldap.LdapProtocolUtils;
 import org.apache.directory.server.ldap.replication.ReplicationConsumerConfig;
 import org.apache.directory.server.ldap.replication.SyncreplConfiguration;
 import org.apache.directory.shared.ldap.codec.controls.manageDsaIT.ManageDsaITDecorator;
@@ -53,6 +57,7 @@ import org.apache.directory.shared.ldap.
 import org.apache.directory.shared.ldap.extras.controls.syncrepl_impl.SyncInfoValueDecorator;
 import org.apache.directory.shared.ldap.extras.controls.syncrepl_impl.SyncRequestValueDecorator;
 import org.apache.directory.shared.ldap.model.constants.SchemaConstants;
+import org.apache.directory.shared.ldap.model.csn.Csn;
 import org.apache.directory.shared.ldap.model.entry.Attribute;
 import org.apache.directory.shared.ldap.model.entry.DefaultAttribute;
 import org.apache.directory.shared.ldap.model.entry.DefaultEntry;
@@ -117,9 +122,6 @@ public class ReplicationConsumerImpl imp
     /** the schema manager */
     private SchemaManager schemaManager;
 
-    /** the cookie file */
-    private File cookieFile;
-
     /** flag to indicate whether the consumer was disconnected */
     private boolean disconnected;
 
@@ -129,14 +131,12 @@ public class ReplicationConsumerImpl imp
     /** attributes on which modification should be ignored */
     private static final String[] MOD_IGNORE_AT = new String[]
         {
-            SchemaConstants.ENTRY_UUID_AT,
-            SchemaConstants.ENTRY_CSN_AT,
-            SchemaConstants.MODIFIERS_NAME_AT,
-            SchemaConstants.MODIFY_TIMESTAMP_AT,
-            SchemaConstants.CREATE_TIMESTAMP_AT,
-            SchemaConstants.CREATORS_NAME_AT,
-            SchemaConstants.ENTRY_PARENT_ID_AT
-    };
+            SchemaConstants.ENTRY_UUID_AT, 
+            SchemaConstants.CREATE_TIMESTAMP_AT, 
+            SchemaConstants.CREATORS_NAME_AT, 
+            SchemaConstants.ENTRY_PARENT_ID_AT,
+            SchemaConstants.COLLECTIVE_ATTRIBUTE_SUBENTRIES_AT
+        };
 
     /** A thread used to refresh in refreshOnly mode */
     private RefresherThread refreshThread;
@@ -151,12 +151,16 @@ public class ReplicationConsumerImpl imp
     private static final Set<AttributeTypeOptions> ENTRY_UUID_ATOP_SET = new HashSet<AttributeTypeOptions>();
 
     private Modification cookieMod;
+    
+    private Modification ridMod;
 
     /** AttributeTypes used for replication */
     private static AttributeType COOKIE_AT_TYPE;
     private static AttributeType ENTRY_UUID_AT;
+    private static AttributeType RID_AT_TYPE;
 
-
+    private static final Map<String,Object> uuidLockMap = new LRUMap( 1000 );
+    
     /**
      * @return the config
      */
@@ -174,31 +178,22 @@ public class ReplicationConsumerImpl imp
     {
         this.directoryService = directoryservice;
 
-        if ( config.isStoreCookieInFile() )
-        {
-            File cookieDir = new File( directoryservice.getInstanceLayout().getRunDirectory(), "cookies" );
-            if ( !cookieDir.mkdir() )
-            {
-                throw new IOException( I18n.err( I18n.ERR_112_COULD_NOT_CREATE_DIRECORY, cookieDir ) );
-            }
-
-            cookieFile = new File( cookieDir, String.valueOf( config.getReplicaId() ) );
-        }
-
         session = directoryService.getAdminSession();
 
         schemaManager = directoryservice.getSchemaManager();
 
         ENTRY_UUID_AT = schemaManager.lookupAttributeTypeRegistry( SchemaConstants.ENTRY_UUID_AT );
         COOKIE_AT_TYPE = schemaManager.lookupAttributeTypeRegistry( SchemaConstants.ADS_REPL_COOKIE );
-
+        RID_AT_TYPE = schemaManager.lookupAttributeTypeRegistry( SchemaConstants.ADS_DS_REPLICA_ID );
+        
         ENTRY_UUID_ATOP_SET.add( new AttributeTypeOptions( ENTRY_UUID_AT ) );
 
         Attribute cookieAttr = new DefaultAttribute( COOKIE_AT_TYPE );
+        cookieMod = new DefaultModification( ModificationOperation.REPLACE_ATTRIBUTE, cookieAttr );
 
-        Modification cookieMod = new DefaultModification( ModificationOperation.REPLACE_ATTRIBUTE, cookieAttr );
-        this.cookieMod = cookieMod;
-
+        Attribute ridAttr = new DefaultAttribute( RID_AT_TYPE );
+        ridMod = new DefaultModification( ModificationOperation.REPLACE_ATTRIBUTE, ridAttr );
+        
         prepareSyncSearchRequest();
     }
 
@@ -211,11 +206,11 @@ public class ReplicationConsumerImpl imp
      */
     public boolean connect()
     {
+        String providerHost = config.getRemoteHost();
+        int port = config.getRemotePort();
+
         try
         {
-            String providerHost = config.getRemoteHost();
-            int port = config.getRemotePort();
-
             // Create a connection
             if ( connection == null )
             {
@@ -246,7 +241,8 @@ public class ReplicationConsumerImpl imp
         }
         catch ( Exception e )
         {
-            LOG.error( "Failed to bind with the given bindDN and credentials", e );
+            LOG.error( "Failed to connect to the server {}:{}", providerHost, String.valueOf( port ) );
+            LOG.error( "", e );
         }
 
         return false;
@@ -318,62 +314,83 @@ public class ReplicationConsumerImpl imp
 
         try
         {
-            Entry remoteEntry = syncResult.getEntry();
-
-            if ( syncStateCtrl.getCookie() != null )
-            {
-                syncCookie = syncStateCtrl.getCookie();
-                LOG.debug( "assigning the cookie from sync state value control: "
-                    + Strings.utf8ToString( syncCookie ) );
-            }
-
-            SyncStateTypeEnum state = syncStateCtrl.getSyncStateType();
-
-            LOG.debug( "state name {}", state.name() );
+            Entry remoteEntry = new DefaultEntry( schemaManager, syncResult.getEntry() );
+            String uuid = remoteEntry.get( ENTRY_UUID_AT ).getString();
+            // lock on UUID to serialize the updates when there are multiple consumers
+            // connected to several producers and to the *same* base/partition
+            Object lock = getLockFor( uuid );
+            synchronized ( lock )
+            {
+                int rid = -1;
+                
+                if ( syncStateCtrl.getCookie() != null )
+                {
+                    syncCookie = syncStateCtrl.getCookie();
+                    rid = LdapProtocolUtils.getReplicaId( Strings.utf8ToString( syncCookie ) );
+                    LOG.debug( "assigning the cookie from sync state value control: "
+                        + Strings.utf8ToString(syncCookie) );
+                }
+                
+                SyncStateTypeEnum state = syncStateCtrl.getSyncStateType();
+                
+                LOG.debug( "state name {}", state.name() );
+                
+                // check to avoid conversion of UUID from byte[] to String
+                if ( LOG.isDebugEnabled() )
+                {
+                    LOG.debug( "entryUUID = {}", Strings.uuidToString(syncStateCtrl.getEntryUUID()) );
+                }
 
-            // check to avoid conversion of UUID from byte[] to String
-            if ( LOG.isDebugEnabled() )
-            {
-                LOG.debug( "entryUUID = {}", Strings.uuidToString( syncStateCtrl.getEntryUUID() ) );
-            }
+                Dn remoteDn = remoteEntry.getDn();
 
             switch ( state )
             {
                 case ADD:
-                    Dn remoteDn = directoryService.getDnFactory().create( remoteEntry.getDn().getName() );
 
                     if ( !session.exists( remoteDn ) )
                     {
                         LOG.debug( "adding entry with dn {}", remoteDn );
                         LOG.debug( remoteEntry.toString() );
-                        session.add( new DefaultEntry( schemaManager, remoteEntry ) );
+                        AddOperationContext addContext = new AddOperationContext( session, remoteEntry );
+                        addContext.setReplEvent( true );
+                        addContext.setRid( rid );
+                        
+                        OperationManager operationManager = directoryService.getOperationManager();
+                        operationManager.add( addContext );
                     }
                     else
                     {
                         LOG.debug( "updating entry in refreshOnly mode {}", remoteDn );
-                        modify( remoteEntry );
+                        modify( remoteEntry, rid );
                     }
 
                     break;
 
                 case MODIFY:
                     LOG.debug( "modifying entry with dn {}", remoteEntry.getDn().getName() );
-                    modify( remoteEntry );
+                    modify( remoteEntry, rid );
 
                     break;
 
                 case MODDN:
                     String entryUuid = Strings.uuidToString( syncStateCtrl.getEntryUUID() ).toString();
-                    applyModDnOperation( remoteEntry, entryUuid );
+                    applyModDnOperation( remoteEntry, entryUuid, rid );
 
                     break;
 
                 case DELETE:
                     LOG.debug( "deleting entry with dn {}", remoteEntry.getDn().getName() );
-                    // incase of a MODDN operation resulting in a branch to be moved out of scope
-                    // ApacheDS replication provider sends a single delete event on the Dn of the moved branch
-                    // so the branch needs to be recursively deleted here
-                    deleteRecursive( remoteEntry.getDn(), null );
+                    if ( !session.exists( remoteDn ) )
+                    {
+                        LOG.debug( "looks like entry {} was already deleted in a prior update (possibly from another provider), skipping delete", remoteDn );
+                    }
+                    else
+                    {
+		                // incase of a MODDN operation resulting in a branch to be moved out of scope
+		                // ApacheDS replication provider sends a single delete event on the Dn of the moved branch
+		                // so the branch needs to be recursively deleted here
+		                deleteRecursive( remoteEntry.getDn(), null );
+                    }
 
                     break;
 
@@ -387,6 +404,7 @@ public class ReplicationConsumerImpl imp
             {
                 storeCookie();
             }
+          }
         }
         catch ( Exception e )
         {
@@ -418,11 +436,16 @@ public class ReplicationConsumerImpl imp
 
             byte[] cookie = syncInfoValue.getCookie();
 
+            int replicaId = -1;
+            
             if ( cookie != null )
             {
                 LOG.debug( "setting the cookie from the sync info: " + Strings.utf8ToString( cookie ) );
                 CONSUMER_LOG.debug( "setting the cookie from the sync info: " + Strings.utf8ToString( cookie ) );
                 syncCookie = cookie;
+
+                String cookieString = Strings.utf8ToString( syncCookie );
+                replicaId = LdapProtocolUtils.getReplicaId( cookieString );
             }
 
             LOG.info( "refreshDeletes: " + syncInfoValue.isRefreshDeletes() );
@@ -433,11 +456,11 @@ public class ReplicationConsumerImpl imp
             // present in the syncIdSet
             if ( syncInfoValue.isRefreshDeletes() )
             {
-                deleteEntries( uuidList, false );
+                deleteEntries( uuidList, false, replicaId );
             }
             else
             {
-                deleteEntries( uuidList, true );
+                deleteEntries( uuidList, true, replicaId );
             }
 
             LOG.info( "refreshDone: " + syncInfoValue.isRefreshDone() );
@@ -446,8 +469,7 @@ public class ReplicationConsumerImpl imp
         }
         catch ( Exception de )
         {
-            LOG.error( "Failed to handle syncinfo message" );
-            de.printStackTrace();
+            LOG.error( "Failed to handle syncinfo message", de );
         }
 
         LOG.debug( ".................... END handleSyncInfo ..............." );
@@ -720,28 +742,22 @@ public class ReplicationConsumerImpl imp
 
         try
         {
-            if ( config.isStoreCookieInFile() )
-            {
-                CONSUMER_LOG.debug( "Storingthe cookie in a file : {}", cookieFile );
-                FileOutputStream fout = new FileOutputStream( cookieFile );
-                fout.write( syncCookie.length );
-                fout.write( syncCookie );
-                fout.close();
-            }
-            else
-            {
-                Attribute attr = cookieMod.getAttribute();
-                attr.clear();
-                attr.add( syncCookie );
-
-                CONSUMER_LOG.debug( "Storing the cookie in the DIT : {}", config.getConfigEntryDn() );
-
-                session.modify( config.getConfigEntryDn(), cookieMod );
-
-                Entry entry = session.lookup( config.getConfigEntryDn(), SchemaConstants.ALL_ATTRIBUTES_ARRAY );
-
-                CONSUMER_LOG.debug( "stored entry : {}", entry );
-            }
+            Attribute attr = cookieMod.getAttribute();
+            attr.clear();
+            attr.add( syncCookie );
+            
+            String cookieString = Strings.utf8ToString( syncCookie );
+            int replicaId = LdapProtocolUtils.getReplicaId( cookieString );
+            
+            Attribute ridAt = ridMod.getAttribute();
+            ridAt.clear();
+            ridAt.add( String.valueOf( replicaId ) );
+            
+            CONSUMER_LOG.debug( "Storing the cookie in the DIT : {}", config.getConfigEntryDn() );
+            
+            //session.modify( config.getConfigEntryDn(), cookieMod, ridMod );
+            session.modify( config.getConfigEntryDn(), cookieMod );
+            CONSUMER_LOG.debug( "stored the cookie in entry {}", config.getConfigEntryDn() );
 
             lastSavedCookie = new byte[syncCookie.length];
             System.arraycopy( syncCookie, 0, lastSavedCookie, 0, syncCookie.length );
@@ -750,7 +766,7 @@ public class ReplicationConsumerImpl imp
         }
         catch ( Exception e )
         {
-            LOG.error( "Failed to store the cookie", e );
+            LOG.error( "Failed to store the cookie in consumer entry {}", config.getConfigEntryDn(), e );
         }
     }
 
@@ -762,55 +778,28 @@ public class ReplicationConsumerImpl imp
     {
         try
         {
-            if ( config.isStoreCookieInFile() )
-            {
-                CONSUMER_LOG.debug( "The cookie is stored in a file : {}", cookieFile );
-
-                if ( cookieFile.exists() && ( cookieFile.length() > 0 ) )
-                {
-                    FileInputStream fin = new FileInputStream( cookieFile );
-                    syncCookie = new byte[fin.read()];
-                    fin.read( syncCookie );
-                    fin.close();
-
-                    lastSavedCookie = new byte[syncCookie.length];
-                    System.arraycopy( syncCookie, 0, lastSavedCookie, 0, syncCookie.length );
-
-                    LOG.debug( "read the cookie from file: " + Strings.utf8ToString( syncCookie ) );
-                }
-            }
-            else
-            {
-                try
+            Entry entry = session.lookup( config.getConfigEntryDn(), SchemaConstants.ALL_ATTRIBUTES_ARRAY );
+            
+            CONSUMER_LOG.debug( "The cookie is stored in the DIT : {}", entry );
+            
+            if ( entry != null )
+            {
+                Attribute attr = entry.get( COOKIE_AT_TYPE );
+                
+                if ( attr != null )
                 {
-                    Entry entry = session.lookup( config.getConfigEntryDn(), SchemaConstants.ALL_ATTRIBUTES_ARRAY );
-
-                    CONSUMER_LOG.debug( "The cookie is stored in the DIT : {}", entry );
-
-                    if ( entry != null )
-                    {
-                        Attribute attr = entry.get( COOKIE_AT_TYPE );
-
-                        if ( attr != null )
-                        {
-                            syncCookie = attr.getBytes();
-                            lastSavedCookie = syncCookie;
-                            CONSUMER_LOG.debug( "Read cookie : '{}'", attr );
-                            LOG.debug( "loaded cookie from DIT" );
-                        }
-                    }
-                }
-                catch ( Exception e )
-                {
-                    // can be ignored, most likely happens if there is no entry with the given Dn
-                    // log in debug mode
-                    LOG.debug( "Failed to read the cookie from the entry", e );
+                    syncCookie = attr.getBytes();
+                    lastSavedCookie = syncCookie;
+                    CONSUMER_LOG.debug( "Read cookie : '{}'", attr );
+                    LOG.debug( "loaded cookie from DIT" );
                 }
             }
         }
         catch ( Exception e )
         {
-            LOG.error( "Failed to read the cookie", e );
+            // can be ignored, most likely happens if there is no entry with the given Dn
+            // log in debug mode
+            LOG.debug( "Failed to read the cookie from the entry {}", config.getConfigEntryDn(), e );
         }
     }
 
@@ -820,28 +809,17 @@ public class ReplicationConsumerImpl imp
      */
     public void removeCookie()
     {
-        if ( config.isStoreCookieInFile() )
+        try
         {
-            if ( cookieFile.exists() && ( cookieFile.length() > 0 ) )
-            {
-                boolean deleted = cookieFile.delete();
-                LOG.info( "deleted cookie file {}", deleted );
-            }
+            Attribute cookieAttr = new DefaultAttribute( COOKIE_AT_TYPE );
+            Modification deleteCookieMod = new DefaultModification( ModificationOperation.REMOVE_ATTRIBUTE,
+                cookieAttr );
+            session.modify( config.getConfigEntryDn(), deleteCookieMod );
         }
-        else
+        catch ( Exception e )
         {
-            try
-            {
-                Attribute cookieAttr = new DefaultAttribute( COOKIE_AT_TYPE );
-                Modification deleteCookieMod = new DefaultModification( ModificationOperation.REMOVE_ATTRIBUTE,
-                    cookieAttr );
-                session.modify( config.getConfigEntryDn(), deleteCookieMod );
-            }
-            catch ( Exception e )
-            {
-                LOG.warn( "Failed to delete the cookie from the entry with Dn {}", config.getConfigEntryDn() );
-                LOG.warn( "{}", e );
-            }
+            LOG.warn( "Failed to delete the cookie from the entry with Dn {}", config.getConfigEntryDn() );
+            LOG.warn( "{}", e );
         }
 
         LOG.info( "resetting sync cookie" );
@@ -851,7 +829,7 @@ public class ReplicationConsumerImpl imp
     }
 
 
-    private void applyModDnOperation( Entry remoteEntry, String entryUuid ) throws Exception
+    private void applyModDnOperation( Entry remoteEntry, String entryUuid, int rid ) throws Exception
     {
         LOG.debug( "MODDN for entry {}, new entry : {}", entryUuid, remoteEntry );
 
@@ -863,19 +841,42 @@ public class ReplicationConsumerImpl imp
             // Retrieve locally the moved or renamed entry
             String filter = "(entryUuid=" + entryUuid + ")";
             SearchRequest searchRequest = new SearchRequestImpl();
-            searchRequest.setBase( Dn.ROOT_DSE );
+            searchRequest.setBase( new Dn( schemaManager, config.getBaseDn() ) );
             searchRequest.setFilter( filter );
             searchRequest.setScope( SearchScope.SUBTREE );
             searchRequest.addAttributes( "entryUuid", "entryCsn", "*" );
 
             EntryFilteringCursor cursor = session.search( searchRequest );
             cursor.beforeFirst();
-            cursor.next();
-
-            Entry localEntry = cursor.get();
+            
+            Entry localEntry = null;
+            
+            if ( cursor.next() )
+            {
+                localEntry = cursor.get();
+            }
 
             cursor.close();
 
+            // can happen in MMR scenario
+            if ( localEntry == null )
+            {
+                return;
+            }
+            
+            if ( config.isMmrMode() )
+            {
+                Csn localCsn = new Csn( localEntry.get( SchemaConstants.ENTRY_CSN_AT ).getString() );
+                Csn remoteCsn = new Csn( remoteEntry.get( SchemaConstants.ENTRY_CSN_AT ).getString() );
+                
+                if ( localCsn.compareTo( remoteCsn ) >= 0 )
+                {
+                    // just discard the received modified entry, that is old
+                    LOG.debug( "local modification is latest, discarding the modDn operation dn {}", remoteEntry.getDn() );
+                    return;
+                }
+            }
+
             // Compute the DN, parentDn and Rdn for both entries
             Dn localDn = localEntry.getDn();
             Dn remoteDn = directoryService.getDnFactory().create( remoteEntry.getDn().getName() );
@@ -909,16 +910,22 @@ public class ReplicationConsumerImpl imp
             {
                 case MOVE:
                     LOG.debug( "moving {} to the new parent {}", localDn, remoteParentDn );
-                    session.move( localDn, remoteParentDn );
-
+                    MoveOperationContext movCtx = new MoveOperationContext( session, localDn, remoteParentDn );
+                    movCtx.setReplEvent( true );
+                    movCtx.setRid( rid );
+                    directoryService.getOperationManager().move( movCtx );
+                    
                     break;
 
                 case RENAME:
                     LOG.debug( "renaming the Dn {} with new Rdn {} and deleteOldRdn flag set to {}", new String[]
                         { localDn.getName(), remoteRdn.getName(), String.valueOf( deleteOldRdn ) } );
 
-                    session.rename( localDn, remoteRdn, deleteOldRdn );
-
+                    RenameOperationContext renCtx = new RenameOperationContext( session, localDn, remoteRdn, deleteOldRdn );
+                    renCtx.setReplEvent( true );
+                    renCtx.setRid( rid );
+                    directoryService.getOperationManager().rename( renCtx );
+                    
                     break;
 
                 case MOVE_AND_RENAME:
@@ -931,8 +938,11 @@ public class ReplicationConsumerImpl imp
                                 remoteRdn.getName(),
                                 String.valueOf( deleteOldRdn ) } );
 
-                    session.moveAndRename( localDn, remoteParentDn, remoteRdn, deleteOldRdn );
-
+                    MoveAndRenameOperationContext movRenCtx = new MoveAndRenameOperationContext( session, localDn, remoteParentDn, remoteRdn, deleteOldRdn );
+                    movRenCtx.setReplEvent( true );
+                    movRenCtx.setRid( rid );
+                    directoryService.getOperationManager().moveAndRename( movRenCtx );
+                    
                     break;
             }
         }
@@ -943,12 +953,30 @@ public class ReplicationConsumerImpl imp
     }
 
 
-    private void modify( Entry remoteEntry ) throws Exception
+    private void modify( Entry remoteEntry, int rid ) throws Exception
     {
-        Entry localEntry = session.lookup( remoteEntry.getDn() );
-
+        LookupOperationContext lookupCtx = new LookupOperationContext( session, remoteEntry.getDn() );
+        lookupCtx.setAttrsId( config.getAttributes() );
+        lookupCtx.setSyncreplLookup( true );
+        
+        Entry localEntry = session.getDirectoryService().getOperationManager().lookup( lookupCtx );
+
+        if ( config.isMmrMode() )
+        {
+            Csn localCsn = new Csn( localEntry.get( SchemaConstants.ENTRY_CSN_AT ).getString() );
+            Csn remoteCsn = new Csn( remoteEntry.get( SchemaConstants.ENTRY_CSN_AT ).getString() );
+            
+            if ( localCsn.compareTo( remoteCsn ) >= 0 )
+            {
+                // just discard the received modified entry, that is old
+                LOG.debug( "local modification is latest, discarding the modification of dn {}", remoteEntry.getDn() );
+                return;
+            }
+        }
+        
         remoteEntry.removeAttributes( MOD_IGNORE_AT );
-
+        localEntry.removeAttributes( MOD_IGNORE_AT );
+        
         List<Modification> mods = new ArrayList<Modification>();
         Iterator<Attribute> itr = localEntry.iterator();
 
@@ -982,7 +1010,19 @@ public class ReplicationConsumerImpl imp
             }
         }
 
-        session.modify( remoteEntry.getDn(), mods );
+        List<Modification> serverModifications = new ArrayList<Modification>( mods.size() );
+
+        for ( Modification mod : mods )
+        {
+            serverModifications.add( new DefaultModification( directoryService.getSchemaManager(), mod ) );
+        }
+
+        ModifyOperationContext modifyContext = new ModifyOperationContext( session, remoteEntry.getDn(), serverModifications );
+        modifyContext.setReplEvent( true );
+        modifyContext.setRid( rid );
+        
+        OperationManager operationManager = directoryService.getOperationManager();
+        operationManager.modify( modifyContext );
     }
 
 
@@ -990,9 +1030,10 @@ public class ReplicationConsumerImpl imp
      * deletes the entries having the UUID given in the list
      *
      * @param uuidList the list of UUIDs
+     * @param replicaId TODO
      * @throws Exception in case of any problems while deleting the entries
      */
-    public void deleteEntries( List<byte[]> uuidList, boolean isRefreshPresent ) throws Exception
+    public void deleteEntries( List<byte[]> uuidList, boolean isRefreshPresent, int replicaId ) throws Exception
     {
         if ( uuidList == null || uuidList.isEmpty() )
         {
@@ -1010,7 +1051,7 @@ public class ReplicationConsumerImpl imp
         if ( isRefreshPresent )
         {
             LOG.debug( "refresh present syncinfo list has {} UUIDs", uuidList.size() );
-            _deleteEntries_( uuidList, isRefreshPresent );
+            _deleteEntries_( uuidList, isRefreshPresent, replicaId );
             return;
         }
 
@@ -1023,7 +1064,7 @@ public class ReplicationConsumerImpl imp
         for ( ; i < count; i++ )
         {
             startIndex = i * NODE_LIMIT;
-            _deleteEntries_( uuidList.subList( startIndex, startIndex + NODE_LIMIT ), isRefreshPresent );
+            _deleteEntries_( uuidList.subList( startIndex, startIndex + NODE_LIMIT ), isRefreshPresent, replicaId );
         }
 
         if ( ( uuidList.size() % NODE_LIMIT ) != 0 )
@@ -1033,7 +1074,7 @@ public class ReplicationConsumerImpl imp
             {
                 startIndex = i * NODE_LIMIT;
             }
-            _deleteEntries_( uuidList.subList( startIndex, uuidList.size() ), isRefreshPresent );
+            _deleteEntries_( uuidList.subList( startIndex, uuidList.size() ), isRefreshPresent, replicaId );
         }
     }
 
@@ -1043,8 +1084,9 @@ public class ReplicationConsumerImpl imp
      *
      * @param limitedUuidList a list of UUIDs whose size is less than or equal to #NODE_LIMIT (node limit applies only for refreshDeletes list)
      * @param isRefreshPresent a flag indicating the type of entries present in the UUID list
+     * @param replicaId TODO
      */
-    private void _deleteEntries_( List<byte[]> limitedUuidList, boolean isRefreshPresent ) throws Exception
+    private void _deleteEntries_( List<byte[]> limitedUuidList, boolean isRefreshPresent, int replicaId ) throws Exception
     {
         ExprNode filter = null;
         int size = limitedUuidList.size();
@@ -1094,6 +1136,7 @@ public class ReplicationConsumerImpl imp
             AliasDerefMode.NEVER_DEREF_ALIASES, ENTRY_UUID_ATOP_SET );
         cursor.beforeFirst();
 
+        List<Dn> entryDnLst = new ArrayList<Dn>();
         while ( cursor.next() )
         {
             Entry entry = cursor.get();
@@ -1110,11 +1153,7 @@ public class ReplicationConsumerImpl imp
     {
         /** A field used to tell the thread it should stop */
         private volatile boolean stop = false;
-
-        /** A mutex used to make the thread sleeping for a moment */
-        private final Object mutex = new Object();
-
-
+        
         public RefresherThread()
         {
             setDaemon( true );
@@ -1132,7 +1171,7 @@ public class ReplicationConsumerImpl imp
                     doSyncSearch( SynchronizationModeEnum.REFRESH_ONLY, false );
 
                     LOG.info( "--------------------- Sleep for a little while ------------------" );
-                    mutex.wait( config.getRefreshInterval() );
+                    Thread.sleep( config.getRefreshInterval() );
                     LOG.debug( "--------------------- syncing again ------------------" );
 
                 }
@@ -1151,11 +1190,22 @@ public class ReplicationConsumerImpl imp
         public void stopRefreshing()
         {
             stop = true;
-
-            // just in case if it is sleeping, wake up the thread
-            mutex.notify();
         }
     }
+    
+    
+    private synchronized Object getLockFor( String uuid )
+    {
+        Object lock = uuidLockMap.get( uuid );
+        
+        if( lock == null )
+        {
+            lock = new Object();
+            uuidLockMap.put( uuid, lock );
+        }
+        
+        return lock;
+    }
 
 
     /**

Modified: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/ReplConsumerManager.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/ReplConsumerManager.java?rev=1398782&r1=1398781&r2=1398782&view=diff
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/ReplConsumerManager.java (original)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/ReplConsumerManager.java Tue Oct 16 13:27:19 2012
@@ -41,6 +41,7 @@ import org.apache.directory.shared.ldap.
 import org.apache.directory.shared.ldap.model.entry.ModificationOperation;
 import org.apache.directory.shared.ldap.model.entry.StringValue;
 import org.apache.directory.shared.ldap.model.exception.LdapEntryAlreadyExistsException;
+import org.apache.directory.shared.ldap.model.exception.LdapException;
 import org.apache.directory.shared.ldap.model.filter.EqualityNode;
 import org.apache.directory.shared.ldap.model.filter.ExprNode;
 import org.apache.directory.shared.ldap.model.message.AliasDerefMode;
@@ -176,9 +177,7 @@ public class ReplConsumerManager
             SchemaConstants.ADS_REPL_SEARCH_FILTER, replica.getSearchFilter() );
 
         adminSession.add( entry );
-
-        // Last, create a 
-
+        
         LOG.debug( "stored replication consumer entry {}", consumerDn );
     }
 
@@ -189,7 +188,7 @@ public class ReplConsumerManager
      * @param replica The added consumer replica
      * @throws Exception If the addition failed
      */
-    public void deleteConsumerEntry( ReplicaEventLog replica ) throws Exception
+    public void deleteConsumerEntry( ReplicaEventLog replica ) throws LdapException
     {
         if ( replica == null )
         {
@@ -201,7 +200,7 @@ public class ReplConsumerManager
         Dn consumerDn = directoryService.getDnFactory().create(
             SchemaConstants.ADS_DS_REPLICA_ID + "=" + replica.getId() + "," + REPL_CONSUMER_DN );
 
-        PROVIDER_LOG.debug( "Deleting the {} consumer", consumerDn );
+        PROVIDER_LOG.debug( "Trying to delete the consumer entry {}", consumerDn );
 
         if ( !adminSession.exists( consumerDn ) )
         {
@@ -209,7 +208,7 @@ public class ReplConsumerManager
             String message = "The replica " + consumerDn.getName() + " does not exist";
             LOG.error( message );
             PROVIDER_LOG.debug( message );
-            throw new LdapEntryAlreadyExistsException( message );
+            return;
         }
 
         // Delete the consumer entry

Modified: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/ReplicaEventLog.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/ReplicaEventLog.java?rev=1398782&r1=1398781&r2=1398782&view=diff
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/ReplicaEventLog.java (original)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/ReplicaEventLog.java Tue Oct 16 13:27:19 2012
@@ -49,7 +49,7 @@ import org.slf4j.LoggerFactory;
  * <li>hostname : the consumer's host</li>
  * <li>searchFilter : the filter</li>
  * <li>lastSentCsn : the last CSN sent by the consumer</li>
- * <li>refreshNPersist : a flag indicating that the consumer is processing in Refresh and presist mode</li>
+ * <li>refreshNPersist : a flag indicating that the consumer is processing in Refresh and persist mode</li>
  * <li></li>
  * </ul>
  * A separate log is maintained for each syncrepl consumer.<br/>
@@ -65,7 +65,7 @@ public class ReplicaEventLog implements 
     /** A logger for the replication provider */
     private static final Logger PROVIDER_LOG = LoggerFactory.getLogger( "PROVIDER_LOG" );
 
-    /** IP address of the syncrepl consumer */
+    /** hostname of the syncrepl consumer */
     private String hostName;
 
     /** the unmodified search filter as it was when received from the client */
@@ -99,7 +99,8 @@ public class ReplicaEventLog implements 
     /** A flag used to indicate that the consumer is not up to date */
     private volatile boolean dirty;
 
-
+    public static final String REPLICA_EVENT_LOG_NAME_PREFIX = "REPL_EVENT_LOG.";
+    
     /**
      * Creates a new instance of EventLog for a replica
      * @param replicaId The replica ID
@@ -112,16 +113,16 @@ public class ReplicaEventLog implements 
         this.searchCriteria = new NotificationCriteria();
         this.searchCriteria.setEventMask( EventType.ALL_EVENT_TYPES_MASK );
 
-        // Create the journal file, or open it of it exists
-        File logDir = directoryService.getInstanceLayout().getLogDirectory();
-        journalFile = new File( logDir, "journalRepl." + replicaId );
+        // Create the journal file, or open if already exists
+        File replDir = directoryService.getInstanceLayout().getReplDirectory();
+        journalFile = new File( replDir, REPLICA_EVENT_LOG_NAME_PREFIX + replicaId );
         recman = new BaseRecordManager( journalFile.getAbsolutePath() );
 
         SerializableComparator<String> comparator = new SerializableComparator<String>(
             SchemaConstants.CSN_ORDERING_MATCH_MR_OID );
         comparator.setSchemaManager( schemaManager );
 
-        journal = new JdbmTable<String, ReplicaEventMessage>( schemaManager, "replication", recman, comparator,
+        journal = new JdbmTable<String, ReplicaEventMessage>( schemaManager, journalFile.getName(), recman, comparator,
             StringSerializer.INSTANCE, new ReplicaEventMessageSerializer( schemaManager ) );
     }
 
@@ -131,7 +132,7 @@ public class ReplicaEventLog implements 
      *
      * @param message The message to store
      */
-    public void log( ReplicaEventMessage message )
+    public synchronized void log( ReplicaEventMessage message )
     {
         try
         {
@@ -325,7 +326,7 @@ public class ReplicaEventLog implements 
 
     /**
      * Update the last Sent CSN. If it's different from the present one, we
-     * will set the dirty flag to true, and a replication will follow.
+     * will set the dirty flag to true, and it will be stored in DIT.
      *  
      * @param lastSentCsn The new Sent CSN
      */
@@ -418,7 +419,32 @@ public class ReplicaEventLog implements 
         return new ReplicaJournalCursor( journal, consumerCsn );
     }
 
+    
+    /**
+     * @return the name of this replica log
+     */
+    public String getName()
+    {
+        return journal.getName();
+    }
+
+    
+    /**
+     * @return the number of entries present in the replica log
+     */
+    public synchronized int count()
+    {
+        try
+        {
+            return journal.count();
+        }
+        catch( IOException e )
+        {
+            throw new RuntimeException( e );
+        }
+    }
 
+    
     /**
      * {@inheritDoc}
      */

Modified: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/ReplicaJournalCursor.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/ReplicaJournalCursor.java?rev=1398782&r1=1398781&r2=1398782&view=diff
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/ReplicaJournalCursor.java (original)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/ReplicaJournalCursor.java Tue Oct 16 13:27:19 2012
@@ -21,10 +21,12 @@
 package org.apache.directory.server.ldap.replication.provider;
 
 
+import java.io.IOException;
 import java.util.Iterator;
 
 import org.apache.directory.server.core.partition.impl.btree.jdbm.JdbmTable;
 import org.apache.directory.server.ldap.replication.ReplicaEventMessage;
+import org.apache.directory.shared.ldap.model.constants.SchemaConstants;
 import org.apache.directory.shared.ldap.model.cursor.AbstractCursor;
 import org.apache.directory.shared.ldap.model.cursor.Cursor;
 import org.apache.directory.shared.ldap.model.cursor.Tuple;
@@ -60,6 +62,9 @@ public class ReplicaJournalCursor extend
 
     private ReplicaEventMessage qualifiedEvtMsg;
 
+    /** used while cleaning up the log */
+    private boolean skipQualifying;
+
 
     /**
      * Creates a cursor on top of the given journal
@@ -71,9 +76,9 @@ public class ReplicaJournalCursor extend
     {
         if ( IS_DEBUG )
         {
-        	LOG_CURSOR.debug( "Creating ReplicaJournalCursor {}", this );
+            LOG_CURSOR.debug( "Creating ReplicaJournalCursor {}", this );
         }
-        
+
         this.journal = journal;
         this.tupleCursor = journal.cursor();
         this.consumerCsn = consumerCsn;
@@ -147,34 +152,30 @@ public class ReplicaJournalCursor extend
      * 
      * @throws Exception
      */
-    private void selectQualified() throws Exception
+    private boolean isQualified( String csn, ReplicaEventMessage evtMsg ) throws Exception
     {
-        Tuple<String, ReplicaEventMessage> t = tupleCursor.get();
-
-        qualifiedEvtMsg = t.getValue();
+        LOG.debug( "ReplicaEventMessage: {}", evtMsg );
 
-        LOG.debug( "ReplicaEventMessage: {}", qualifiedEvtMsg );
-
-        if ( qualifiedEvtMsg.isEventOlderThan( consumerCsn ) )
+        if ( evtMsg.isEventOlderThan( consumerCsn ) )
         {
             if ( LOG.isDebugEnabled() )
             {
                 String evt = "MODDN"; // take this as default cause the event type for MODDN is null
 
-                ChangeType changeType = qualifiedEvtMsg.getChangeType();
+                ChangeType changeType = evtMsg.getChangeType();
 
                 if ( changeType != null )
                 {
                     evt = changeType.name();
                 }
 
-                LOG.debug( "event {} for dn {} is not qualified for sending", evt, qualifiedEvtMsg.getEntry().getDn() );
+                LOG.debug( "event {} for dn {} is not qualified for sending", evt, evtMsg.getEntry().getDn() );
             }
 
-            // TODO need to be checked if this causes issues in JDBM
-            journal.remove( t.getKey() );
-            qualifiedEvtMsg = null;
+            return false;
         }
+
+        return true;
     }
 
 
@@ -194,12 +195,28 @@ public class ReplicaJournalCursor extend
     {
         while ( tupleCursor.next() )
         {
-            selectQualified();
+            Tuple<String, ReplicaEventMessage> tuple = tupleCursor.get();
 
-            if ( qualifiedEvtMsg != null )
+            String csn = tuple.getKey();
+            ReplicaEventMessage message = tuple.getValue();
+
+            if ( skipQualifying )
+            {
+                qualifiedEvtMsg = message;
+                return true;
+            }
+
+            boolean qualified = isQualified( csn, message );
+
+            if ( qualified )
             {
+                qualifiedEvtMsg = message;
                 return true;
             }
+            else
+            {
+                journal.remove( csn );
+            }
         }
 
         qualifiedEvtMsg = null;
@@ -225,9 +242,9 @@ public class ReplicaJournalCursor extend
     {
         if ( IS_DEBUG )
         {
-        	LOG_CURSOR.debug( "Closing ReplicaJournalCursor {}", this );
+            LOG_CURSOR.debug( "Closing ReplicaJournalCursor {}", this );
         }
-        
+
         tupleCursor.close();
         super.close();
     }
@@ -241,15 +258,45 @@ public class ReplicaJournalCursor extend
     {
         if ( IS_DEBUG )
         {
-        	LOG_CURSOR.debug( "Closing ReplicaJournalCursor {}", this );
+            LOG_CURSOR.debug( "Closing ReplicaJournalCursor {}", this );
         }
-        
+
         tupleCursor.close();
         super.close( cause );
     }
 
 
     /**
+     * sets the flag to skip CSN based checking while traversing
+     * used for internal log cleanup ONLY 
+     */
+    protected void skipQualifyingWhileFetching()
+    {
+        skipQualifying = true;
+    }
+
+
+    /**
+     * delete the current message
+     * used for internal log cleanup ONLY
+     */
+    protected void delete()
+    {
+        try
+        {
+            if ( qualifiedEvtMsg != null )
+            {
+                journal.remove( qualifiedEvtMsg.getEntry().get( SchemaConstants.ENTRY_CSN_AT ).getString() );
+            }
+        }
+        catch ( Exception e )
+        {
+
+        }
+    }
+
+
+    /**
      * {@inheritDoc}
      */
     @Override