You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by ka...@apache.org on 2012/10/16 15:27:20 UTC

svn commit: r1398782 [2/2] - in /directory/apacheds/trunk: core-api/src/main/java/org/apache/directory/server/core/api/event/ core-api/src/main/java/org/apache/directory/server/core/api/interceptor/context/ core-jndi/src/main/java/org/apache/directory/...

Modified: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/SyncReplRequestHandler.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/SyncReplRequestHandler.java?rev=1398782&r1=1398781&r2=1398782&view=diff
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/SyncReplRequestHandler.java (original)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/SyncReplRequestHandler.java Tue Oct 16 13:27:19 2012
@@ -26,18 +26,24 @@ import static org.apache.directory.serve
 import static org.apache.directory.server.ldap.LdapServer.NO_TIME_LIMIT;
 
 import java.io.File;
+import java.io.FilenameFilter;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.directory.server.constants.ServerDNConstants;
 import org.apache.directory.server.core.api.DirectoryService;
+import org.apache.directory.server.core.api.event.DirectoryListenerAdapter;
+import org.apache.directory.server.core.api.event.EventService;
 import org.apache.directory.server.core.api.event.EventType;
 import org.apache.directory.server.core.api.event.NotificationCriteria;
 import org.apache.directory.server.core.api.filtering.EntryFilteringCursor;
+import org.apache.directory.server.core.api.interceptor.context.DeleteOperationContext;
 import org.apache.directory.server.i18n.I18n;
 import org.apache.directory.server.ldap.LdapProtocolUtils;
 import org.apache.directory.server.ldap.LdapServer;
@@ -84,13 +90,13 @@ import org.apache.directory.shared.ldap.
 import org.apache.directory.shared.ldap.model.message.SearchScope;
 import org.apache.directory.shared.ldap.model.message.controls.ChangeType;
 import org.apache.directory.shared.ldap.model.message.controls.ManageDsaIT;
+import org.apache.directory.shared.ldap.model.name.Dn;
 import org.apache.directory.shared.ldap.model.schema.AttributeType;
 import org.apache.directory.shared.ldap.model.url.LdapUrl;
 import org.apache.directory.shared.util.Strings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * Class used to process the incoming synchronization request from the consumers.
  *
@@ -126,6 +132,12 @@ public class SyncReplRequestHandler impl
     private ReplConsumerManager replicaUtil;
 
 
+    private ConsumerLogEntryDeleteListener cledListener;
+    
+    private ReplicaEventLogJanitor logJanitor;
+    
+    private AttributeType CSN_AT;
+    
     /**
      * Create a SyncReplRequestHandler empty instance 
      */
@@ -153,12 +165,14 @@ public class SyncReplRequestHandler impl
 
             this.ldapServer = server;
             this.dirService = server.getDirectoryService();
-
+            
+            CSN_AT = dirService.getSchemaManager()
+                .lookupAttributeTypeRegistry( SchemaConstants.ENTRY_CSN_AT );
+            
             OBJECT_CLASS_AT = dirService.getSchemaManager()
                 .lookupAttributeTypeRegistry( SchemaConstants.OBJECT_CLASS_AT );
 
-            File workDir = dirService.getInstanceLayout().getLogDirectory();
-            syncReplData = new File( workDir, "syncrepl-data" );
+            syncReplData = dirService.getInstanceLayout().getReplDirectory();
 
             if ( !syncReplData.exists() )
             {
@@ -172,8 +186,18 @@ public class SyncReplRequestHandler impl
 
             loadReplicaInfo();
 
+            logJanitor = new ReplicaEventLogJanitor( replicaLogMap );
+            logJanitor.start();
+            
             registerPersistentSearches();
 
+            cledListener = new ConsumerLogEntryDeleteListener();
+            NotificationCriteria criteria = new NotificationCriteria();
+            criteria.setBase( new Dn( dirService.getSchemaManager(), ServerDNConstants.REPL_CONSUMER_DN_STR ) );
+            criteria.setEventMask( EventType.DELETE );
+            
+            dirService.getEventService().addListener( cledListener, criteria );
+            
             Thread consumerInfoUpdateThread = new Thread( createConsumerInfoUpdateTask() );
             consumerInfoUpdateThread.setDaemon( true );
             consumerInfoUpdateThread.start();
@@ -196,21 +220,27 @@ public class SyncReplRequestHandler impl
      */
     public void stop()
     {
+        logJanitor.stopCleaning();
+        
+        EventService evtSrv = dirService.getEventService();
+        
         for ( ReplicaEventLog log : replicaLogMap.values() )
         {
             try
             {
                 PROVIDER_LOG.debug( "Stopping the logging for replica ", log.getId() );
+                evtSrv.removeListener( log.getPersistentListener() );
                 log.stop();
             }
             catch ( Exception e )
             {
-                LOG.warn( "Failed to close the event log {}", log.getId() );
-                LOG.warn( "", e );
+                LOG.warn( "Failed to close the event log {}", log.getId(), e );
                 PROVIDER_LOG.error( "Failed to close the event log {}", log.getId(), e );
             }
         }
-
+        
+        evtSrv.removeListener( cledListener );
+        
         initialized = false;
     }
 
@@ -290,57 +320,62 @@ public class SyncReplRequestHandler impl
     /**
      * Send all the stored modifications to the consumer
      */
-    private String sendContentFromLog( LdapSession session, SearchRequest req, ReplicaEventLog clientMsgLog,
-        String consumerCsn )
+    private void sendContentFromLog( LdapSession session, SearchRequest req, ReplicaEventLog clientMsgLog, String fromCsn )
         throws Exception
     {
         // do the search from the log
-        String lastSentCsn = clientMsgLog.getLastSentCsn();
-
-        ReplicaJournalCursor cursor = clientMsgLog.getCursor( consumerCsn );
+        String lastSentCsn = fromCsn;
 
+        ReplicaJournalCursor cursor = clientMsgLog.getCursor( fromCsn );
+        
         PROVIDER_LOG.debug( "Processing the log for replica {}", clientMsgLog.getId() );
 
-        while ( cursor.next() )
+        try
         {
-            ReplicaEventMessage replicaEventMessage = cursor.get();
-            Entry entry = replicaEventMessage.getEntry();
-            LOG.debug( "Read message from the queue {}", entry );
-            PROVIDER_LOG.debug( "Read message from the queue {}", entry );
-
-            lastSentCsn = entry.get( SchemaConstants.ENTRY_CSN_AT ).getString();
-
-            ChangeType event = replicaEventMessage.getChangeType();
-
-            // if event type is null, then it is a MODDN operation
-            if ( event == ChangeType.MODDN )
-            {
-                sendSearchResultEntry( session, req, entry, SyncStateTypeEnum.MODIFY );
-            }
-            else
+            while ( cursor.next() )
             {
+                ReplicaEventMessage replicaEventMessage = cursor.get();
+                Entry entry = replicaEventMessage.getEntry();
+                LOG.debug( "Read message from the queue {}", entry );
+                PROVIDER_LOG.debug( "Read message from the queue {}", entry );
+                
+                lastSentCsn = entry.get( CSN_AT ).getString();
+                
+                ChangeType event = replicaEventMessage.getChangeType();
+                
                 SyncStateTypeEnum syncStateType = null;
-
+                
                 switch ( event )
                 {
-                    case ADD:
-                    case MODIFY:
+                    case ADD :
                         syncStateType = SyncStateTypeEnum.ADD;
                         break;
-
-                    case DELETE:
+                        
+                    case MODIFY :
+                        syncStateType = SyncStateTypeEnum.MODIFY;
+                        break;
+                        
+                    case MODDN :
+                        syncStateType = SyncStateTypeEnum.MODDN;
+                        
+                    case DELETE :
                         syncStateType = SyncStateTypeEnum.DELETE;
                         break;
                 }
-
+                
                 sendSearchResultEntry( session, req, entry, syncStateType );
+                
+                clientMsgLog.setLastSentCsn( lastSentCsn );
+                
+                PROVIDER_LOG.debug( "The latest entry sent to the consumer {} has this CSN : {}", clientMsgLog.getId(), lastSentCsn );
             }
+            
+            PROVIDER_LOG.debug( "All pending modifciations for replica {} processed", clientMsgLog.getId() );
+        }
+        finally
+        {
+            cursor.close();
         }
-
-        PROVIDER_LOG.debug( "All pending modifciations for replica {} processed", clientMsgLog.getId() );
-        cursor.close();
-
-        return lastSentCsn;
     }
 
 
@@ -351,57 +386,56 @@ public class SyncReplRequestHandler impl
     private void doContentUpdate( LdapSession session, SearchRequest req, ReplicaEventLog replicaLog, String consumerCsn )
         throws Exception
     {
-        boolean refreshNPersist = isRefreshNPersist( req );
-
-        // if this method is called with refreshAndPersist  
-        // means the client was offline after it initiated a persistent synch session
-        // we need to update the handler's session 
-        if ( refreshNPersist )
-        {
-            SyncReplSearchListener handler = replicaLog.getPersistentListener();
-            handler.setSearchRequest( req );
-            handler.setSession( session );
-        }
-
-        String lastSentCsn = sendContentFromLog( session, req, replicaLog, consumerCsn );
-
-        PROVIDER_LOG.debug( "The latest entry sent to the consumer {} has this CSN : {}", replicaLog.getId(),
-            lastSentCsn );
-        byte[] cookie = LdapProtocolUtils.createCookie( replicaLog.getId(), lastSentCsn );
-
-        if ( refreshNPersist )
-        {
-            IntermediateResponse intermResp = new IntermediateResponseImpl( req.getMessageId() );
-            intermResp.setResponseName( SyncInfoValue.OID );
-
-            SyncInfoValue syncInfo = new SyncInfoValueDecorator( ldapServer.getDirectoryService()
-                .getLdapCodecService(),
-                SynchronizationInfoEnum.NEW_COOKIE );
-            syncInfo.setCookie( cookie );
-            intermResp.setResponseValue( ( ( SyncInfoValueDecorator ) syncInfo ).getValue() );
-
-            PROVIDER_LOG
-                .debug( "Sent the intermediate response to the {} consumer, {}", replicaLog.getId(), intermResp );
-            session.getIoSession().write( intermResp );
-
-            replicaLog.getPersistentListener().setPushInRealTime( refreshNPersist );
-        }
-        else
+        synchronized ( replicaLog )
         {
-            SearchResultDone searchDoneResp = ( SearchResultDone ) req.getResultResponse();
-            searchDoneResp.getLdapResult().setResultCode( ResultCodeEnum.SUCCESS );
-            SyncDoneValue syncDone = new SyncDoneValueDecorator(
-                ldapServer.getDirectoryService().getLdapCodecService() );
-            syncDone.setCookie( cookie );
-            searchDoneResp.addControl( syncDone );
-
-            PROVIDER_LOG.debug( "Send a SearchResultDone response to the {} consumer", replicaLog.getId(),
-                searchDoneResp );
-
-            session.getIoSession().write( searchDoneResp );
+            boolean refreshNPersist = isRefreshNPersist( req );
+            
+            // if this method is called with refreshAndPersist  
+            // means the client was offline after it initiated a persistent synch session
+            // we need to update the handler's session 
+            if ( refreshNPersist )
+            {
+                SyncReplSearchListener handler = replicaLog.getPersistentListener();
+                handler.setSearchRequest( req );
+                handler.setSession( session );
+            }
+            
+            sendContentFromLog( session, req, replicaLog, consumerCsn );
+            
+            String lastSentCsn = replicaLog.getLastSentCsn();
+            
+            byte[] cookie = LdapProtocolUtils.createCookie( replicaLog.getId(), lastSentCsn );
+            
+            if ( refreshNPersist )
+            {
+                IntermediateResponse intermResp = new IntermediateResponseImpl( req.getMessageId() );
+                intermResp.setResponseName( SyncInfoValue.OID );
+                
+                SyncInfoValue syncInfo = new SyncInfoValueDecorator( ldapServer.getDirectoryService()
+                    .getLdapCodecService(),
+                    SynchronizationInfoEnum.NEW_COOKIE );
+                syncInfo.setCookie( cookie );
+                intermResp.setResponseValue( ((SyncInfoValueDecorator)syncInfo).getValue() );
+                
+                PROVIDER_LOG.debug( "Sent the intermediate response to the {} consumer, {}", replicaLog.getId(), intermResp );
+                session.getIoSession().write( intermResp );
+                
+                replicaLog.getPersistentListener().setPushInRealTime( refreshNPersist );
+            }
+            else
+            {
+                SearchResultDone searchDoneResp = ( SearchResultDone ) req.getResultResponse();
+                searchDoneResp.getLdapResult().setResultCode( ResultCodeEnum.SUCCESS );
+                SyncDoneValue syncDone = new SyncDoneValueDecorator( 
+                    ldapServer.getDirectoryService().getLdapCodecService() );
+                syncDone.setCookie( cookie );
+                searchDoneResp.addControl( syncDone );
+                
+                PROVIDER_LOG.debug( "Send a SearchResultDone response to the {} consumer", replicaLog.getId(), searchDoneResp );
+                
+                session.getIoSession().write( searchDoneResp );
+            }
         }
-
-        replicaLog.setLastSentCsn( lastSentCsn );
     }
 
 
@@ -428,7 +462,7 @@ public class SyncReplRequestHandler impl
         StringValue contexCsnValue = new StringValue( contextCsn );
 
         // modify the filter to include the context Csn
-        GreaterEqNode csnGeNode = new GreaterEqNode( SchemaConstants.ENTRY_CSN_AT, contexCsnValue );
+        GreaterEqNode csnGeNode = new GreaterEqNode( CSN_AT, contexCsnValue );
         ExprNode postInitContentFilter = new AndNode( modifiedFilter, csnGeNode );
         request.setFilter( postInitContentFilter );
 
@@ -457,32 +491,36 @@ public class SyncReplRequestHandler impl
         dirService.getEventService().addListener( handler, criteria );
 
         // then start pushing initial content
-        LessEqNode csnNode = new LessEqNode( SchemaConstants.ENTRY_CSN_AT, contexCsnValue );
+        LessEqNode csnNode = new LessEqNode( CSN_AT, contexCsnValue );
 
         // modify the filter to include the context Csn
         ExprNode initialContentFilter = new AndNode( modifiedFilter, csnNode );
         request.setFilter( initialContentFilter );
 
         // Now, do a search to get all the entries
-        SearchResultDone searchDoneResp = doSimpleSearch( session, request );
+        SearchResultDone searchDoneResp = doSimpleSearch( session, request, replicaLog );
 
         if ( searchDoneResp.getLdapResult().getResultCode() == ResultCodeEnum.SUCCESS )
         {
-            replicaLog.setLastSentCsn( contextCsn );
+            if( replicaLog.getLastSentCsn() == null )
+            {
+                replicaLog.setLastSentCsn( contextCsn );
+            }
+            
             byte[] cookie = LdapProtocolUtils.createCookie( replicaLog.getId(), contextCsn );
 
             if ( refreshNPersist ) // refreshAndPersist mode
             {
-                contextCsn = sendContentFromLog( session, request, replicaLog, contextCsn );
-                cookie = LdapProtocolUtils.createCookie( replicaLog.getId(), contextCsn );
+                sendContentFromLog( session, request, replicaLog, contextCsn );
+                cookie = LdapProtocolUtils.createCookie(replicaLog.getId(), replicaLog.getLastSentCsn());
 
                 IntermediateResponse intermResp = new IntermediateResponseImpl( request.getMessageId() );
                 intermResp.setResponseName( SyncInfoValue.OID );
 
-                SyncInfoValue syncInfo = new SyncInfoValueDecorator(
+                SyncInfoValue syncInfo = new SyncInfoValueDecorator( 
                     ldapServer.getDirectoryService().getLdapCodecService(), SynchronizationInfoEnum.NEW_COOKIE );
                 syncInfo.setCookie( cookie );
-                intermResp.setResponseValue( ( ( SyncInfoValueDecorator ) syncInfo ).getValue() );
+                intermResp.setResponseValue( ((SyncInfoValueDecorator)syncInfo).getValue() );
 
                 PROVIDER_LOG.info( "Sending the intermediate response to consumer {}, {}", replicaLog, syncInfo );
 
@@ -498,8 +536,7 @@ public class SyncReplRequestHandler impl
                     ldapServer.getDirectoryService().getLdapCodecService() );
                 syncDone.setCookie( cookie );
                 searchDoneResp.addControl( syncDone );
-                PROVIDER_LOG.info( "Sending the searchResultDone response to consumer {}, {}", replicaLog,
-                    searchDoneResp );
+                PROVIDER_LOG.info( "Sending the searchResultDone response to consumer {}, {}", replicaLog, searchDoneResp );
 
                 session.getIoSession().write( searchDoneResp );
             }
@@ -511,7 +548,7 @@ public class SyncReplRequestHandler impl
                 .getResultCode() );
             PROVIDER_LOG.warn( "initial content refresh didn't succeed due to {}", searchDoneResp.getLdapResult()
                 .getResultCode() );
-            replicaLog.truncate();
+            replicaLog.stop();
             replicaLog = null;
 
             // remove the listener
@@ -521,7 +558,7 @@ public class SyncReplRequestHandler impl
         }
 
         // if all is well then store the consumer information
-        replicaUtil.addConsumerEntry( replicaLog );
+        replicaUtil.addConsumerEntry(replicaLog );
 
         // add to the map only after storing in the DIT, else the Replica update thread barfs
         replicaLogMap.put( replicaLog.getId(), replicaLog );
@@ -532,7 +569,7 @@ public class SyncReplRequestHandler impl
      * Process a search on the provider to get all the modified entries. We then send all
      * of them to the consumer
      */
-    private SearchResultDone doSimpleSearch( LdapSession session, SearchRequest req ) throws Exception
+    private SearchResultDone doSimpleSearch( LdapSession session, SearchRequest req, ReplicaEventLog replicaLog ) throws Exception
     {
         SearchResultDone searchDoneResp = ( SearchResultDone ) req.getResultResponse();
         LdapResult ldapResult = searchDoneResp.getLdapResult();
@@ -562,7 +599,7 @@ public class SyncReplRequestHandler impl
             LOG.debug( "using <{},{}> for size limit", requestLimit, serverLimit );
             long sizeLimit = min( requestLimit, serverLimit );
 
-            readResults( session, req, ldapResult, cursor, sizeLimit );
+            readResults( session, req, ldapResult, cursor, sizeLimit, replicaLog );
         }
         finally
         {
@@ -587,10 +624,11 @@ public class SyncReplRequestHandler impl
      * Process the results get from a search request. We will send them to the client.
      */
     private void readResults( LdapSession session, SearchRequest req, LdapResult ldapResult,
-        EntryFilteringCursor cursor, long sizeLimit ) throws Exception
+        EntryFilteringCursor cursor, long sizeLimit, ReplicaEventLog replicaLog ) throws Exception
     {
         long count = 0;
 
+        
         while ( ( count < sizeLimit ) && cursor.next() )
         {
             // Handle closed session
@@ -598,8 +636,7 @@ public class SyncReplRequestHandler impl
             {
                 // The client has closed the connection
                 LOG.debug( "Request terminated for message {}, the client has closed the session", req.getMessageId() );
-                PROVIDER_LOG.debug( "Request terminated for message {}, the client has closed the session",
-                    req.getMessageId() );
+                PROVIDER_LOG.debug( "Request terminated for message {}, the client has closed the session", req.getMessageId() );
                 break;
             }
 
@@ -615,6 +652,9 @@ public class SyncReplRequestHandler impl
 
             sendSearchResultEntry( session, req, entry, SyncStateTypeEnum.ADD );
 
+            String lastSentCsn = entry.get( CSN_AT ).getString();
+            replicaLog.setLastSentCsn( lastSentCsn );
+            
             count++;
         }
 
@@ -894,7 +934,8 @@ public class SyncReplRequestHandler impl
         try
         {
             List<ReplicaEventLog> eventLogs = replicaUtil.getReplicaEventLogs();
-
+            List<String> eventLogNames = new ArrayList<String>();
+            
             if ( !eventLogs.isEmpty() )
             {
                 for ( ReplicaEventLog replica : eventLogs )
@@ -902,6 +943,7 @@ public class SyncReplRequestHandler impl
                     LOG.debug( "initializing the replica log from {}", replica.getId() );
                     PROVIDER_LOG.debug( "initializing the replica log from {}", replica.getId() );
                     replicaLogMap.put( replica.getId(), replica );
+                    eventLogNames.add( replica.getName() );
 
                     // update the replicaCount's value to assign a correct value to the new replica(s) 
                     if ( replicaCount.get() < replica.getId() )
@@ -915,6 +957,17 @@ public class SyncReplRequestHandler impl
                 LOG.debug( "no replica logs found to initialize" );
                 PROVIDER_LOG.debug( "no replica logs found to initialize" );
             }
+            
+            // remove unused logs
+            File[] replicaLogNames = getAllReplJournalNames();
+            for( File f : replicaLogNames )
+            {
+                if( !eventLogNames.contains( f.getName() ) )
+                {
+                    f.delete();
+                    LOG.info( "removed unused replication event log {}", f );
+                }
+            }
         }
         catch ( Exception e )
         {
@@ -946,9 +999,8 @@ public class SyncReplRequestHandler impl
             {
                 LOG.warn( "invalid peristent search criteria {} for the replica {}", log.getSearchCriteria(), log
                     .getId() );
-                PROVIDER_LOG.warn( "invalid peristent search criteria {} for the replica {}", log.getSearchCriteria(),
-                    log
-                        .getId() );
+                PROVIDER_LOG.warn( "invalid peristent search criteria {} for the replica {}", log.getSearchCriteria(), log
+                    .getId() );
             }
         }
     }
@@ -1042,4 +1094,66 @@ public class SyncReplRequestHandler impl
 
         return control.getMode() == SynchronizationModeEnum.REFRESH_AND_PERSIST;
     }
+    
+    
+    private File[] getAllReplJournalNames()
+    {
+        File replDir = dirService.getInstanceLayout().getReplDirectory();
+        FilenameFilter filter = new FilenameFilter()
+        {
+            @Override
+            public boolean accept( File dir, String name )
+            {
+                return name.startsWith( ReplicaEventLog.REPLICA_EVENT_LOG_NAME_PREFIX );
+            }
+        };
+        
+        return replDir.listFiles( filter );
+    }
+    
+    
+    /**
+     * an event listener for handling deletions of replication event log entries present under ou=consumers,ou=system
+     */
+    private class ConsumerLogEntryDeleteListener extends DirectoryListenerAdapter
+    {
+        @Override
+        public void entryDeleted( DeleteOperationContext deleteContext )
+        {
+            Dn consumerLogDn = deleteContext.getDn();
+            String name = ReplicaEventLog.REPLICA_EVENT_LOG_NAME_PREFIX + consumerLogDn.getRdn().getValue().getString();
+            // lock this listener instance
+            synchronized ( this )
+            {
+                for( ReplicaEventLog log : replicaLogMap.values() )
+                {
+                    if( name.equalsIgnoreCase( log.getName() ) )
+                    {
+                        synchronized ( log )
+                        {
+                            dirService.getEventService().removeListener( log.getPersistentListener() );
+                            LOG.debug( "removed the persistent listener for replication event log {}", consumerLogDn );
+                            
+                            replicaLogMap.remove( log.getId() );
+                            try
+                            {
+                                // get the correct name, just incase cause we used equalsIgnoreCase
+                                name = log.getName();
+                                log.stop();
+                                
+                                new File( dirService.getInstanceLayout().getReplDirectory(), name ).delete();
+                                LOG.info( "successfully removed replication event log {}", consumerLogDn );
+                            }
+                            catch( Exception e )
+                            {
+                                LOG.warn( "Closing the replication event log of the entry {} was not successful, will be removed anyway", consumerLogDn, e );
+                            }
+                        }
+                        
+                        break;
+                    }
+                } // end of for
+            } // end of synchronized block
+        } // end of delete method
+    }
 }

Modified: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/SyncReplSearchListener.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/SyncReplSearchListener.java?rev=1398782&r1=1398781&r2=1398782&view=diff
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/SyncReplSearchListener.java (original)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/SyncReplSearchListener.java Tue Oct 16 13:27:19 2012
@@ -20,10 +20,12 @@
 package org.apache.directory.server.ldap.replication.provider;
 
 
+import org.apache.directory.server.constants.ServerDNConstants;
 import org.apache.directory.server.core.api.DirectoryService;
 import org.apache.directory.server.core.api.entry.ClonedServerEntry;
 import org.apache.directory.server.core.api.event.DirectoryListener;
 import org.apache.directory.server.core.api.event.EventType;
+import org.apache.directory.server.core.api.interceptor.context.AbstractChangeOperationContext;
 import org.apache.directory.server.core.api.interceptor.context.AddOperationContext;
 import org.apache.directory.server.core.api.interceptor.context.DeleteOperationContext;
 import org.apache.directory.server.core.api.interceptor.context.ModifyOperationContext;
@@ -67,7 +69,7 @@ public class SyncReplSearchListener impl
 
     /** The ldap session */
     private LdapSession session;
-
+    
     /** The search request we are processing */
     private SearchRequest searchRequest;
 
@@ -76,8 +78,11 @@ public class SyncReplSearchListener impl
 
     /** The consumer configuration */
     private final ReplicaEventLog consumerMsgLog;
-
-
+    
+    private static String replConsumerConfigDn = ServerDNConstants.REPL_CONSUMER_CONFIG_DN.toLowerCase();
+    private static String schemaDn = SchemaConstants.OU_SCHEMA.toLowerCase();
+    private static String replConsumerDn = ServerDNConstants.REPL_CONSUMER_DN_STR.toLowerCase();
+    
     /**
      * Create a new instance of a consumer listener
      * 
@@ -114,7 +119,7 @@ public class SyncReplSearchListener impl
     public void setSearchRequest( SearchRequest searchRequest )
     {
         this.searchRequest = searchRequest;
-
+        
         if ( searchRequest != null )
         {
             searchRequest.addAbandonListener( this );
@@ -122,7 +127,14 @@ public class SyncReplSearchListener impl
     }
 
 
-    /**
+    @Override
+	public boolean isSynchronous()
+    {
+		return true; // always synchronous
+	}
+
+
+	/**
      * Abandon a SearchRequest
      * 
      * @param searchRequest The SearchRequest to abandon
@@ -156,11 +168,11 @@ public class SyncReplSearchListener impl
         }
     }
 
-
+    
     /**
      * Create the SyncStateValue control
      */
-    private SyncStateValue createControl( DirectoryService directoryService, SyncStateTypeEnum operation, Entry entry )
+    private SyncStateValue createControl( DirectoryService directoryService, SyncStateTypeEnum operation, Entry entry ) 
         throws LdapInvalidAttributeValueException
     {
         SyncStateValue syncStateValue = new SyncStateValueDecorator( directoryService.getLdapCodecService() );
@@ -169,15 +181,15 @@ public class SyncReplSearchListener impl
         String uuidStr = entry.get( SchemaConstants.ENTRY_UUID_AT ).getString();
         syncStateValue.setEntryUUID( Strings.uuidToBytes( uuidStr ) );
         syncStateValue.setCookie( getCookie( entry ) );
-
+        
         return syncStateValue;
     }
-
-
+    
+    
     /**
      * Send the result to the consumer. If the consumer has disconnected, we fail back to the queue.
      */
-    private void sendResult( SearchResultEntry searchResultEntry, Entry entry, EventType eventType,
+    private void sendResult( SearchResultEntry searchResultEntry, Entry entry, EventType eventType, 
         SyncStateValue syncStateValue )
     {
         searchResultEntry.addControl( syncStateValue );
@@ -188,7 +200,7 @@ public class SyncReplSearchListener impl
         // Now, send the entry to the consumer
         handleWriteFuture( future, entry, eventType );
     }
-
+    
 
     /**
      * Process a ADD operation. The added entry is pushed to the consumer if it's connected,
@@ -199,13 +211,17 @@ public class SyncReplSearchListener impl
     public void entryAdded( AddOperationContext addContext )
     {
         Entry entry = addContext.getEntry();
+        
+        if ( isConfigEntry( entry ) || isNotValidForReplication( addContext ) )
+        {
+            return;
+        }
 
         try
         {
             //System.out.println( "ADD Listener : log " + entry.getDn() );
             // we log it first
-            consumerMsgLog.log( new ReplicaEventMessage( ChangeType.ADD, ( ( ClonedServerEntry ) entry )
-                .getClonedEntry() ) );
+            consumerMsgLog.log( new ReplicaEventMessage( ChangeType.ADD, ((ClonedServerEntry)entry).getClonedEntry() ) );
 
             // We send the added entry directly to the consumer if it's connected
             if ( pushInRealTime )
@@ -216,12 +232,11 @@ public class SyncReplSearchListener impl
                 resultEntry.setEntry( entry );
 
                 // Create the control which will be added to the response.
-                SyncStateValue syncAdd = createControl( session.getCoreSession().getDirectoryService(),
-                    SyncStateTypeEnum.ADD, entry );
-
+                SyncStateValue syncAdd = createControl( session.getCoreSession().getDirectoryService(), SyncStateTypeEnum.ADD, entry );
+                
                 sendResult( resultEntry, entry, EventType.ADD, syncAdd );
             }
-
+            
         }
         catch ( LdapInvalidAttributeValueException e )
         {
@@ -240,9 +255,15 @@ public class SyncReplSearchListener impl
     public void entryDeleted( DeleteOperationContext deleteContext )
     {
         Entry entry = deleteContext.getEntry();
+        
+        if ( isConfigEntry( entry ) || isNotValidForReplication( deleteContext ) )
+        {
+            return;
+        }
+        
         sendDeletedEntry( entry );
     }
-
+    
 
     /**
      * A helper method, as the delete opertaionis used by the ModDN operations.
@@ -253,15 +274,14 @@ public class SyncReplSearchListener impl
         {
             //System.out.println( "DELETE Listener : log " + entry.getDn() );
             consumerMsgLog.log( new ReplicaEventMessage( ChangeType.DELETE, entry ) );
-
+            
             if ( pushInRealTime )
             {
                 SearchResultEntry resultEntry = new SearchResultEntryImpl( searchRequest.getMessageId() );
                 resultEntry.setObjectName( entry.getDn() );
                 resultEntry.setEntry( entry );
 
-                SyncStateValue syncDelete = createControl( session.getCoreSession().getDirectoryService(),
-                    SyncStateTypeEnum.DELETE, entry );
+                SyncStateValue syncDelete = createControl( session.getCoreSession().getDirectoryService(), SyncStateTypeEnum.DELETE, entry );
 
                 sendResult( resultEntry, entry, EventType.DELETE, syncDelete );
             }
@@ -284,11 +304,16 @@ public class SyncReplSearchListener impl
     {
         Entry alteredEntry = modifyContext.getAlteredEntry();
 
+        if ( isConfigEntry( alteredEntry ) || isNotValidForReplication( modifyContext ) )
+        {
+            return;
+        }
+
         try
         {
             //System.out.println( "MODIFY Listener : log " + alteredEntry.getDn() );
             consumerMsgLog.log( new ReplicaEventMessage( ChangeType.MODIFY, alteredEntry ) );
-
+            
             if ( pushInRealTime )
             {
 
@@ -296,8 +321,7 @@ public class SyncReplSearchListener impl
                 resultEntry.setObjectName( modifyContext.getDn() );
                 resultEntry.setEntry( alteredEntry );
 
-                SyncStateValue syncModify = createControl( session.getCoreSession().getDirectoryService(),
-                    SyncStateTypeEnum.MODIFY, alteredEntry );
+                SyncStateValue syncModify = createControl( session.getCoreSession().getDirectoryService(), SyncStateTypeEnum.MODIFY, alteredEntry );
 
                 sendResult( resultEntry, alteredEntry, EventType.MODIFY, syncModify );
             }
@@ -320,6 +344,11 @@ public class SyncReplSearchListener impl
         // should always send the modified entry cause the consumer perform the modDn operation locally
         Entry entry = moveContext.getModifiedEntry();
 
+        if ( isConfigEntry( entry ) || isNotValidForReplication( moveContext ) )
+        {
+            return;
+        }
+
         try
         {
             if ( !moveContext.getNewSuperior().isDescendantOf( consumerMsgLog.getSearchCriteria().getBase() ) )
@@ -330,17 +359,16 @@ public class SyncReplSearchListener impl
 
             //System.out.println( "MOVE Listener : log " + moveContext.getDn() + " moved to " + moveContext.getNewSuperior() );
             consumerMsgLog.log( new ReplicaEventMessage( ChangeType.MODDN, entry ) );
-
+            
             if ( pushInRealTime )
             {
                 SearchResultEntry resultEntry = new SearchResultEntryImpl( searchRequest.getMessageId() );
                 resultEntry.setObjectName( moveContext.getDn() );
                 resultEntry.setEntry( entry );
 
-                SyncStateValue syncModify = createControl( session.getCoreSession().getDirectoryService(),
-                    SyncStateTypeEnum.MODDN, entry );
+                SyncStateValue syncModify = createControl( session.getCoreSession().getDirectoryService(), SyncStateTypeEnum.MODDN, entry );
 
-                sendResult( resultEntry, entry, null, syncModify );
+                sendResult( resultEntry, entry, EventType.MOVE, syncModify );
             }
         }
         catch ( Exception e )
@@ -358,31 +386,36 @@ public class SyncReplSearchListener impl
      */
     public void entryMovedAndRenamed( MoveAndRenameOperationContext moveAndRenameContext )
     {
+        // should always send the modified entry cause the consumer perform the modDn operation locally
+        Entry entry = moveAndRenameContext.getModifiedEntry();
+
+        if ( isConfigEntry( entry ) || isNotValidForReplication( moveAndRenameContext ) )
+        {
+            return;
+        }
+
         try
         {
             if ( !moveAndRenameContext.getNewSuperiorDn().isDescendantOf( consumerMsgLog.getSearchCriteria().getBase() ) )
             {
-                sendDeletedEntry( moveAndRenameContext.getEntry() );
+                sendDeletedEntry( entry );
                 return;
             }
 
-            // should always send the modified entry cause the consumer perform the modDn operation locally
-            Entry entry = moveAndRenameContext.getModifiedEntry();
 
             //System.out.println( "MOVE AND RENAME Listener : log " + moveAndRenameContext.getDn() + 
             //    " moved to " + moveAndRenameContext.getNewSuperiorDn() + " renamed to " + moveAndRenameContext.getNewRdn() );
             consumerMsgLog.log( new ReplicaEventMessage( ChangeType.MODDN, entry ) );
-
+            
             if ( pushInRealTime )
             {
                 SearchResultEntry resultEntry = new SearchResultEntryImpl( searchRequest.getMessageId() );
                 resultEntry.setObjectName( entry.getDn() );
                 resultEntry.setEntry( entry );
 
-                SyncStateValue syncModify = createControl( session.getCoreSession().getDirectoryService(),
-                    SyncStateTypeEnum.MODDN, entry );
+                SyncStateValue syncModify = createControl( session.getCoreSession().getDirectoryService(), SyncStateTypeEnum.MODDN, entry );
 
-                sendResult( resultEntry, entry, null, syncModify );
+                sendResult( resultEntry, entry, EventType.MOVE_AND_RENAME, syncModify );
             }
         }
         catch ( Exception e )
@@ -403,25 +436,29 @@ public class SyncReplSearchListener impl
         // should always send the modified entry cause the consumer perform the modDn operation locally
         Entry entry = renameContext.getModifiedEntry();
 
+        if ( isConfigEntry( entry ) || isNotValidForReplication( renameContext ) )
+        {
+            return;
+        }
+
         try
         {
             // should always send the original entry cause the consumer perform the modDn operation there
             //System.out.println( "RENAME Listener : log " + renameContext.getDn() + " renamed to " + renameContext.getNewRdn() );
             consumerMsgLog.log( new ReplicaEventMessage( ChangeType.MODDN, entry ) );
-
+            
             if ( pushInRealTime )
             {
                 SearchResultEntry resultEntry = new SearchResultEntryImpl( searchRequest.getMessageId() );
                 resultEntry.setObjectName( entry.getDn() );
                 resultEntry.setEntry( entry );
 
-                SyncStateValue syncModify = createControl( session.getCoreSession().getDirectoryService(),
-                    SyncStateTypeEnum.MODDN, entry );
-
+                SyncStateValue syncModify = createControl( session.getCoreSession().getDirectoryService(), SyncStateTypeEnum.MODDN, entry );
+                
                 // In this case, the cookie is different
                 syncModify.setCookie( getCookie( entry ) );
 
-                sendResult( resultEntry, entry, null, syncModify );
+                sendResult( resultEntry, entry, EventType.RENAME, syncModify );
             }
         }
         catch ( Exception e )
@@ -469,34 +506,113 @@ public class SyncReplSearchListener impl
         // Let the operation be executed.
         // Note : we wait 10 seconds max
         future.awaitUninterruptibly( 10000L );
-
+        
         if ( !future.isWritten() )
         {
-            LOG.error( "Failed to write to the consumer {} during the event {} on entry {}", new Object[]
-                {
-                    consumerMsgLog.getId(), event, entry.getDn() } );
+            LOG.error( "Failed to write to the consumer {} during the event {} on entry {}", new Object[] { 
+                           consumerMsgLog.getId(), event, entry.getDn() } );
             LOG.error( "", future.getException() );
 
             // set realtime push to false, will be set back to true when the client
             // comes back and sends another request this flag will be set to true
             pushInRealTime = false;
         }
+        else
+        {
+            try
+            {
+                // if successful update the last sent CSN
+                consumerMsgLog.setLastSentCsn( entry.get( SchemaConstants.ENTRY_CSN_AT ).getString() );
+            }
+            catch( Exception e )
+            {
+                //should never happen
+                LOG.error( "No entry CSN attribute found", e );
+            }
+        }
+    }
+    
+    
+    /**
+     * checks if the given entry belongs to the ou=config or ou=schema partition
+     * We don't replicate those two partitions
+     * @param entry the entry
+     * @return true if the entry belongs to ou=config partition, false otherwise
+     */
+    private boolean isConfigEntry( Entry entry )
+    {
+        // we can do Dn.isDescendantOf but in this part of the
+        // server the DNs are all normalized and a simple string compare should
+        // do the trick
+        
+        String name = entry.getDn().getName().toLowerCase();
+        
+        if ( name.endsWith( replConsumerConfigDn ) ||
+             name.endsWith( schemaDn ) ||
+             name.endsWith( replConsumerDn ) )
+        {
+            return true;
+        }
+        
+        // do not replicate the changes made to transport config entries
+        if( name.startsWith( "ads-transportid" ) && name.endsWith( ServerDNConstants.CONFIG_DN ) )
+        {
+            return true;
+        }
+        
+        return false;
+    }
+    
+    
+    private boolean isNotValidForReplication( AbstractChangeOperationContext ctx )
+    {
+        if( ctx.isGenerateNoReplEvt() )
+        {
+            return true;
+        }
+        
+        return isMmrConfiguredToReceiver( ctx );
     }
-
 
     /**
+     * checks if the sender of this replication event is setup with MMR
+     * (Note: this method is used to prevent sending a repicated event back to the sender after 
+     *  performing local update)
+     * @param ctx the operation's context
+     * @return true if the rid present in operation context is same as the event log's ID, false otherwise
+     */
+    private boolean isMmrConfiguredToReceiver( AbstractChangeOperationContext ctx )
+    {
+        
+        if( ctx.isReplEvent() )
+        {
+            boolean skip = ( ctx.getRid() == consumerMsgLog.getId() );
+            
+            if( skip )
+            {
+                LOG.debug( "RID in operation context matches with the ID of replication event log {} for host {}", consumerMsgLog.getName(), consumerMsgLog.getHostName() );
+            }
+            
+            return skip;
+        }
+        
+        return false;
+    }
+    
+    
+    /**
      * {@inheritDoc}
      */
     public String toString()
     {
         StringBuilder sb = new StringBuilder();
-
+        
         sb.append( "SyncReplSearchListener : \n" );
         sb.append( '\'' ).append( searchRequest ).append( "', " );
         sb.append( '\'' ).append( pushInRealTime ).append( "', \n" );
         sb.append( consumerMsgLog );
         sb.append( '\n' );
-
+        
         return sb.toString();
     }
 }