You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by ka...@apache.org on 2010/01/20 13:19:33 UTC

svn commit: r901154 - in /directory/apacheds/trunk/syncrepl/src/main/java/org/apache/directory/server/syncrepl: SyncReplConsumer.java SyncreplRunnerUI.java

Author: kayyagari
Date: Wed Jan 20 12:19:32 2010
New Revision: 901154

URL: http://svn.apache.org/viewvc?rev=901154&view=rev
Log:
o started handling refreshOnly mode of synchronization
o lots of code reorg and formatting

Modified:
    directory/apacheds/trunk/syncrepl/src/main/java/org/apache/directory/server/syncrepl/SyncReplConsumer.java
    directory/apacheds/trunk/syncrepl/src/main/java/org/apache/directory/server/syncrepl/SyncreplRunnerUI.java

Modified: directory/apacheds/trunk/syncrepl/src/main/java/org/apache/directory/server/syncrepl/SyncReplConsumer.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/syncrepl/src/main/java/org/apache/directory/server/syncrepl/SyncReplConsumer.java?rev=901154&r1=901153&r2=901154&view=diff
==============================================================================
--- directory/apacheds/trunk/syncrepl/src/main/java/org/apache/directory/server/syncrepl/SyncReplConsumer.java (original)
+++ directory/apacheds/trunk/syncrepl/src/main/java/org/apache/directory/server/syncrepl/SyncReplConsumer.java Wed Jan 20 12:19:32 2010
@@ -23,7 +23,6 @@
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -53,7 +52,6 @@
 import org.apache.directory.shared.ldap.codec.controls.replication.syncInfoValue.SyncInfoValueControlDecoder;
 import org.apache.directory.shared.ldap.codec.controls.replication.syncStateValue.SyncStateValueControlCodec;
 import org.apache.directory.shared.ldap.codec.controls.replication.syncStateValue.SyncStateValueControlDecoder;
-import org.apache.directory.shared.ldap.constants.SchemaConstants;
 import org.apache.directory.shared.ldap.entry.Entry;
 import org.apache.directory.shared.ldap.entry.EntryAttribute;
 import org.apache.directory.shared.ldap.entry.Modification;
@@ -107,27 +105,35 @@
 
     /** the schema manager */
     private SchemaManager schemaManager;
-    
+
     /** the decoder for syncinfovalue control */
     private SyncInfoValueControlDecoder decoder = new SyncInfoValueControlDecoder();
 
     /** the cookie file */
     private File cookieFile;
-    
+
     /** flag to indicate whether the consumer was diconncted */
     private boolean disconnected;
 
     /** the core session */
     private CoreSession session;
-    
+
     private SyncDoneValueControlDecoder syncDoneControlDecoder = new SyncDoneValueControlDecoder();
-    
+
     private SyncStateValueControlDecoder syncStateControlDecoder = new SyncStateValueControlDecoder();
 
     /** attributes on which modification should be ignored */
-    private static final String[] MOD_IGNORE_AT = new String[]{ "entryUUID", "entryCSN"}; //{ "1.3.6.1.1.16.4", "1.3.6.1.4.1.4203.666.1.7" };
-    
-    
+    private static final String[] MOD_IGNORE_AT = new String[] { "entryUUID", "entryCSN" }; //{ "1.3.6.1.1.16.4", "1.3.6.1.4.1.4203.666.1.7" };
+
+    /** flag to indicate whether the current phase is for deleting entries */
+    private boolean refreshDeletes;
+
+    /** flag set after receiving refreshPresent Sync Info message */
+    private boolean refreshDone;
+
+    private RefresherThread refreshThread;
+
+
     /**
      * @return the config
      */
@@ -146,27 +152,17 @@
     }
 
 
-    /**
-     * A helper method to quickly quit the program
-     */
-    private static void quit( LdapConnection connection ) throws IOException
-    {
-        connection.close();
-        System.exit( 1 );
-    }
-
-
     public void init( DirectoryService directoryservice ) throws Exception
     {
         this.directoryService = directoryservice;
 
         File cookieDir = new File( directoryservice.getWorkingDirectory(), "cookies" );
         cookieDir.mkdir();
-        
+
         cookieFile = new File( cookieDir, String.valueOf( config.getReplicaId() ) );
-        
+
         session = directoryService.getAdminSession();
-        
+
         schemaManager = directoryservice.getSchemaManager();
     }
 
@@ -179,7 +175,7 @@
             int port = config.getPort();
 
             // Create a connection
-            if( connection == null )
+            if ( connection == null )
             {
                 connection = new LdapConnection( providerHost, port );
             }
@@ -225,7 +221,7 @@
         String baseDn = config.getBaseDn();
 
         searchRequest = new SearchRequest();
-        
+
         searchRequest.setBaseDn( baseDn );
         searchRequest.setFilter( config.getFilter() );
         searchRequest.setSizeLimit( config.getSearchSizeLimit() );
@@ -236,16 +232,7 @@
         searchRequest.setScope( SearchScope.getSearchScope( config.getSearchScope() ) );
         searchRequest.setTypesOnly( false );
 
-        String[] attributes = config.getAttributes();
-        
-        if ( attributes == null )
-        {
-            searchRequest.addAttributes( SchemaConstants.ALL_USER_ATTRIBUTES );
-        }
-        else
-        {
-            searchRequest.addAttributes( attributes );
-        }
+        searchRequest.addAttributes( config.getAttributes() );
 
         syncReq = new SyncRequestValueControl();
 
@@ -259,33 +246,21 @@
         }
 
         syncReq.setReloadHint( false );
-
-        Control control = new SyncRequestValueControl();
-        ((SyncRequestValueControl)control).setCookie( syncReq.getEncodedValue() );
-
-        try
-        {
-            searchRequest.add( control );
-        }
-        catch( LdapException e )
-        {
-            // shouldn't happen
-            LOG.error( "Failed to add constrol to the search request", e );
-        }
     }
 
 
     public void handleSearchDone( SearchResultDone searchDone )
     {
         LOG.debug( "///////////////// handleSearchDone //////////////////" );
-        
+
         Control ctrl = searchDone.getControl( SyncDoneValueControl.CONTROL_OID );
         SyncDoneValueControlCodec syncDoneCtrl = null;
         try
         {
             syncDoneCtrl = ( SyncDoneValueControlCodec ) syncDoneControlDecoder.decode( ctrl.getEncodedValue() );
+            refreshDeletes = syncDoneCtrl.isRefreshDeletes();
         }
-        catch( Exception e )
+        catch ( Exception e )
         {
             LOG.error( "Failed to decode the syncDoneControlCodec", e );
         }
@@ -295,36 +270,6 @@
             syncCookie = syncDoneCtrl.getCookie();
             LOG.debug( "assigning cookie from sync done value control: " + StringTools.utf8ToString( syncCookie ) );
         }
-        else
-        {
-            LOG.info( "cookie in syncdone message is null" );
-        }
-
-        if ( !config.isRefreshPersist() )
-        {
-            // Now, switch to refreshAndPresist
-            config.setRefreshPersist( true );
-            LOG.debug( "Swithing to RefreshAndPersist" );
-         
-            try
-            {
-                // the below call is required to send the updated cookie
-                // and refresh mode change (i.e to refreshAndPersist stage)
-                // cause while the startSync() method sleeps even after the 'sync done'
-                // message arrives as part of initial searchRequest with 'refreshOnly' mode.
-                // During this sleep time any 'modifications' happened on the server 
-                // to already fetched entries will be sent as SearchResultEntries with
-                // SyncState value as 'ADD' which will conflict with the DNs of initially received entries
-                
-                // TODO thinking of alternative ways to implement this in case of large DITs 
-                // where the modification to entry(ies) can happen before the initial sync is done
-                doSyncSearch();
-            }
-            catch( Exception e )
-            {
-                LOG.error( "Failed to send a search request with RefreshAndPersist mode", e );
-            }
-        }
 
         LOG.debug( "//////////////// END handleSearchDone//////////////////////" );
     }
@@ -345,14 +290,24 @@
         {
             Entry remoteEntry = syncResult.getEntry();
 
+            // for refreshOnly
+            if ( !config.isRefreshPersist() )
+            {
+                if ( !refreshDeletes )
+                {
+                    LOG.info( "the number of attributes present in the entry {} during present phase {}", remoteEntry
+                        .getDn().getName(), remoteEntry.size() );
+                }
+            }
+
             Control ctrl = syncResult.getControl( SyncStateValueControl.CONTROL_OID );
             SyncStateValueControlCodec syncStateCtrl = null;
-            
+
             try
             {
                 syncStateCtrl = ( SyncStateValueControlCodec ) syncStateControlDecoder.decode( ctrl.getEncodedValue() );
             }
-            catch( Exception e )
+            catch ( Exception e )
             {
                 LOG.error( "Failed to decode syncStateControl", e );
             }
@@ -372,7 +327,7 @@
 
             switch ( state )
             {
-                case ADD :
+                case ADD:
                     if ( !session.exists( remoteEntry.getDn() ) )
                     {
                         LOG.debug( "adding entry with dn {}", remoteEntry.getDn().getName() );
@@ -381,12 +336,12 @@
                     }
 
                     break;
-            
-                case MODIFY :
+
+                case MODIFY:
                     modify( remoteEntry );
                     break;
 
-                case DELETE :
+                case DELETE:
                     LOG.debug( "deleting entry with dn {}", remoteEntry.getDn().getName() );
                     session.delete( remoteEntry.getDn() );
                     break;
@@ -403,7 +358,6 @@
 
     /**
      * {@inheritDoc}
-     * atm does nothinng except examinig and printing the content of syncinfovalue control
      */
     public void handleSyncInfo( byte[] syncinfo )
     {
@@ -424,26 +378,29 @@
             List<byte[]> uuidList = syncInfoValue.getSyncUUIDs();
 
             LOG.info( "refreshDeletes: " + syncInfoValue.isRefreshDeletes() );
-            if( uuidList != null )
+            if ( uuidList != null )
             {
-                for( byte[] uuid : uuidList )
+                for ( byte[] uuid : uuidList )
                 {
                     LOG.info( "uuid: {}", StringTools.utf8ToString( uuid ) );
                 }
             }
 
+            refreshDeletes = syncInfoValue.isRefreshDeletes();
+            refreshDone = syncInfoValue.isRefreshDone();
+
             // if refreshDeletes set to true then delete all the entries with entryUUID
             // present in the syncIdSet 
-            if( syncInfoValue.isRefreshDeletes() && ( uuidList != null ) )
+            if ( syncInfoValue.isRefreshDeletes() && ( uuidList != null ) )
             {
-                for( byte[] uuid : uuidList )
+                for ( byte[] uuid : uuidList )
                 {
                     // TODO similar to delete based on DN there should be 
                     // a method to delete an Entry based on entryUUID
                     LOG.debug( "FIXME deleting the entry with entryUUID: {}", UUID.nameUUIDFromBytes( uuid ) );
                 }
             }
-            
+
             LOG.info( "refreshDone: " + syncInfoValue.isRefreshDone() );
         }
         catch ( DecoderException de )
@@ -461,20 +418,20 @@
      */
     public void handleSessionClosed()
     {
-        if( disconnected )
+        if ( disconnected )
         {
             return;
         }
-        
+
         boolean connected = false;
-        
-        while( !connected )
+
+        while ( !connected )
         {
             try
             {
                 Thread.sleep( config.getConsumerInterval() );
             }
-            catch( InterruptedException e )
+            catch ( InterruptedException e )
             {
                 LOG.error( "Interrupted while sleeping before trying to reconnect", e );
             }
@@ -482,105 +439,36 @@
             LOG.debug( "Trying to reconnect" );
             connected = bind();
         }
-        
+
         startSync();
     }
 
 
     /**
-     * starts the syn operation
+     * starts the synchronization operation
      */
     public void startSync()
     {
         // read the cookie if persisted
         readCookie();
-        
-        if( config.isRefreshPersist() )
+
+        if ( config.isRefreshPersist() )
         {
             try
             {
                 LOG.debug( "==================== Refresh And Persist ==========" );
-                doSyncSearch();    
+                doSyncSearch( SynchronizationModeEnum.REFRESH_AND_PERSIST );
             }
-            catch( Exception e )
+            catch ( Exception e )
             {
                 LOG.error( "Failed to sync with refreshAndPersist mode", e );
             }
-            
-            return;
         }
-        
-        // continue till refreshAndPersist mode is not set
-        while( !config.isRefreshPersist() )
-        {
-            
-            LOG.debug( "==================== Refresh Only ==========" );
-            
-            try
-            {
-                if ( ( syncReq.getCookie() == null ) || ( syncReq.getCookie().length == 0 ) )
-                {
-                    LOG.debug( "First search (no cookie)" );
-                }
-                else
-                {
-                    LOG.debug( "searching with searchRequest, cookie '{}'", StringTools.utf8ToString( syncReq.getCookie() ) );
-                }
-                
-                doSyncSearch();
-
-                LOG.info( "--------------------- Sleep for a little while ------------------" );
-                Thread.sleep( config.getConsumerInterval() );
-                LOG.debug( "--------------------- syncing again ------------------" );
-
-            }
-            catch ( Exception e )
-            {
-                LOG.error( "Failed to sync with refresh only mode", e );
-            }
+        else
+        {
+            refreshThread = new RefresherThread();
+            refreshThread.start();
         }
-        
-        LOG.debug( "**************** Done with the initial sync ***************" );
-    }
-
-    
-    //====================== SearchListener methods ====================================
-    
-    
-    /* (non-Javadoc)
-     * @see org.apache.directory.shared.ldap.client.api.listeners.SearchListener#entryFound(org.apache.directory.shared.ldap.client.api.LdapConnection, org.apache.directory.shared.ldap.client.api.messages.SearchResultEntry)
-     */
-    public void entryFound( LdapConnection connection, SearchResultEntry searchResultEntry ) throws LdapException
-    {
-        handleSearchResult( searchResultEntry );
-    }
-
-
-    /* (non-Javadoc)
-     * @see org.apache.directory.shared.ldap.client.api.listeners.SearchListener#referralFound(org.apache.directory.shared.ldap.client.api.LdapConnection, org.apache.directory.shared.ldap.client.api.messages.SearchResultReference)
-     */
-    public void referralFound( LdapConnection connection, SearchResultReference searchResultReference )
-        throws LdapException
-    {
-        handleSearchReference( searchResultReference );
-    }
-
-
-    /* (non-Javadoc)
-     * @see org.apache.directory.shared.ldap.client.api.listeners.SearchListener#searchDone(org.apache.directory.shared.ldap.client.api.LdapConnection, org.apache.directory.shared.ldap.client.api.messages.SearchResultDone)
-     */
-    public void searchDone( LdapConnection connection, SearchResultDone searchResultDone ) throws LdapException
-    {
-        handleSearchDone( searchResultDone );
-    }
-
-
-    /* (non-Javadoc)
-     * @see org.apache.directory.shared.ldap.client.api.listeners.IntermediateResponseListener#responseReceived(org.apache.directory.shared.ldap.client.api.LdapConnection, org.apache.directory.shared.ldap.client.api.messages.IntermediateResponse)
-     */
-    public void responseReceived( LdapConnection connection, IntermediateResponse intermediateResponse )
-    {
-        handleSyncInfo( intermediateResponse.getResponseValue() );
     }
 
 
@@ -589,37 +477,35 @@
      *
      * @throws Exception in case of any problems encountered while searching
      */
-    private void doSyncSearch() throws Exception
+    private void doSyncSearch( SynchronizationModeEnum syncType ) throws Exception
     {
         SyncRequestValueControl syncReq = new SyncRequestValueControl();
-        
-        if( config.isRefreshPersist() )
-        {
-            syncReq.setMode( SynchronizationModeEnum.REFRESH_AND_PERSIST );
-        }
 
+        syncReq.setMode( syncType );
         if ( syncCookie != null )
         {
+            LOG.debug( "searching with searchRequest, cookie '{}'", StringTools.utf8ToString( syncCookie ) );
             syncReq.setCookie( syncCookie );
         }
-        
-        SyncRequestValueControl syncReqControl = ( SyncRequestValueControl ) searchRequest.getControl( SyncRequestValueControl.CONTROL_OID );
-        
-        searchRequest.remove( syncReqControl );
 
         searchRequest.add( syncReq );
-        
+
         // Do the search
         connection.search( searchRequest, this );
     }
 
-    
+
     public void disconnet()
     {
         disconnected = true;
 
         try
         {
+            if ( refreshThread != null )
+            {
+                refreshThread.stopRefreshing();
+            }
+
             connection.unBind();
             LOG.info( "Unbound from the server {}", config.getProviderHost() );
 
@@ -634,7 +520,7 @@
         {
             LOG.error( "Failed to close the connection", e );
         }
-        
+
     }
 
 
@@ -643,26 +529,26 @@
      */
     private void storeCookie()
     {
-        if( syncCookie == null )
+        if ( syncCookie == null )
         {
             return;
         }
-        
+
         try
         {
             FileOutputStream fout = new FileOutputStream( cookieFile );
             fout.write( syncCookie.length );
             fout.write( syncCookie );
             fout.close();
-            
+
             LOG.debug( "stored the cookie" );
         }
-        catch( Exception e )
+        catch ( Exception e )
         {
             LOG.error( "Failed to store the cookie", e );
         }
     }
-    
+
 
     /**
      * read the cookie from a file(if exists).
@@ -671,56 +557,55 @@
     {
         try
         {
-            if( cookieFile.exists() && ( cookieFile.length() > 0 ) )
+            if ( cookieFile.exists() && ( cookieFile.length() > 0 ) )
             {
                 FileInputStream fin = new FileInputStream( cookieFile );
-                syncCookie = new byte[ fin.read() ];
+                syncCookie = new byte[fin.read()];
                 fin.read( syncCookie );
                 fin.close();
-                
+
                 LOG.debug( "read the cookie from file: " + StringTools.utf8ToString( syncCookie ) );
             }
         }
-        catch( Exception e )
+        catch ( Exception e )
         {
             LOG.error( "Failed to read the cookie", e );
         }
     }
 
-    
+
     /**
      * deletes the cookie file(if exists) 
      */
     public void deleteCookieFile()
     {
-        if( cookieFile != null && cookieFile.exists() )
+        if ( cookieFile != null && cookieFile.exists() )
         {
             LOG.debug( "deleting the cookie file" );
             cookieFile.delete();
         }
     }
-    
-    
-    private void modify( Entry remoteEntry ) throws Exception 
+
+
+    private void modify( Entry remoteEntry ) throws Exception
     {
         LOG.debug( "modifying entry with dn {}", remoteEntry.getDn().getName() );
-        
+
         Entry localEntry = session.lookup( remoteEntry.getDn() );
-        
+
         remoteEntry.removeAttributes( MOD_IGNORE_AT );
 
         List<Modification> mods = new ArrayList<Modification>();
         Iterator<EntryAttribute> itr = localEntry.iterator();
-     
+
         while ( itr.hasNext() )
         {
             EntryAttribute localAttr = itr.next();
             String attrId = localAttr.getId();
             Modification mod;
             EntryAttribute remoteAttr = remoteEntry.get( attrId );
-            
-            
-            if (  remoteAttr != null ) // would be better if we compare the values also? or will it consume more time?
+
+            if ( remoteAttr != null ) // would be better if we compare the values also? or will it consume more time?
             {
                 mod = new ServerModification( ModificationOperation.REPLACE_ATTRIBUTE, remoteAttr );
                 remoteEntry.remove( remoteAttr );
@@ -729,19 +614,108 @@
             {
                 mod = new ServerModification( ModificationOperation.REMOVE_ATTRIBUTE, localAttr );
             }
-        
+
             mods.add( mod );
         }
 
-        if( remoteEntry.size() > 0 )
+        if ( remoteEntry.size() > 0 )
         {
             itr = remoteEntry.iterator();
-            while( itr.hasNext() )
+            while ( itr.hasNext() )
             {
                 mods.add( new ServerModification( ModificationOperation.ADD_ATTRIBUTE, itr.next() ) );
             }
         }
-        
+
         session.modify( remoteEntry.getDn(), mods );
     }
+
+    /**
+     * A Thread implementation for synchronizing the DIT in refreshOnly mode
+     */
+    private class RefresherThread extends Thread
+    {
+        private volatile boolean stop;
+
+
+        public RefresherThread()
+        {
+            setDaemon( true );
+        }
+
+
+        public void run()
+        {
+            while ( !stop )
+            {
+                LOG.debug( "==================== Refresh Only ==========" );
+
+                try
+                {
+                    doSyncSearch( SynchronizationModeEnum.REFRESH_ONLY );
+
+                    LOG.info( "--------------------- Sleep for a little while ------------------" );
+                    Thread.sleep( config.getConsumerInterval() );
+                    LOG.debug( "--------------------- syncing again ------------------" );
+
+                }
+                catch ( InterruptedException ie )
+                {
+                    LOG.warn( "refresher thread interrupted" );
+                }
+                catch ( Exception e )
+                {
+                    LOG.error( "Failed to sync with refresh only mode", e );
+                }
+            }
+        }
+
+
+        public void stopRefreshing()
+        {
+            stop = true;
+            // just incase if it is sleeping
+            this.interrupt();
+        }
+    }
+
+
+    //====================== SearchListener methods ====================================
+
+    /* (non-Javadoc)
+     * @see org.apache.directory.shared.ldap.client.api.listeners.SearchListener#entryFound(org.apache.directory.shared.ldap.client.api.LdapConnection, org.apache.directory.shared.ldap.client.api.messages.SearchResultEntry)
+     */
+    public void entryFound( LdapConnection connection, SearchResultEntry searchResultEntry ) throws LdapException
+    {
+        handleSearchResult( searchResultEntry );
+    }
+
+
+    /* (non-Javadoc)
+     * @see org.apache.directory.shared.ldap.client.api.listeners.SearchListener#referralFound(org.apache.directory.shared.ldap.client.api.LdapConnection, org.apache.directory.shared.ldap.client.api.messages.SearchResultReference)
+     */
+    public void referralFound( LdapConnection connection, SearchResultReference searchResultReference )
+        throws LdapException
+    {
+        handleSearchReference( searchResultReference );
+    }
+
+
+    /* (non-Javadoc)
+     * @see org.apache.directory.shared.ldap.client.api.listeners.SearchListener#searchDone(org.apache.directory.shared.ldap.client.api.LdapConnection, org.apache.directory.shared.ldap.client.api.messages.SearchResultDone)
+     */
+    public void searchDone( LdapConnection connection, SearchResultDone searchResultDone ) throws LdapException
+    {
+        handleSearchDone( searchResultDone );
+    }
+
+
+    /* (non-Javadoc)
+     * @see org.apache.directory.shared.ldap.client.api.listeners.IntermediateResponseListener#responseReceived(org.apache.directory.shared.ldap.client.api.LdapConnection, org.apache.directory.shared.ldap.client.api.messages.IntermediateResponse)
+     */
+    public void responseReceived( LdapConnection connection, IntermediateResponse intermediateResponse )
+    {
+        handleSyncInfo( intermediateResponse.getResponseValue() );
+    }
+
 }

Modified: directory/apacheds/trunk/syncrepl/src/main/java/org/apache/directory/server/syncrepl/SyncreplRunnerUI.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/syncrepl/src/main/java/org/apache/directory/server/syncrepl/SyncreplRunnerUI.java?rev=901154&r1=901153&r2=901154&view=diff
==============================================================================
--- directory/apacheds/trunk/syncrepl/src/main/java/org/apache/directory/server/syncrepl/SyncreplRunnerUI.java (original)
+++ directory/apacheds/trunk/syncrepl/src/main/java/org/apache/directory/server/syncrepl/SyncreplRunnerUI.java Wed Jan 20 12:19:32 2010
@@ -119,6 +119,8 @@
         config.setAttributes( "*,entryUUID,entryCSN" );
         config.setSearchScope( SearchScope.SUBTREE.getJndiScope() );
         config.setReplicaId( 1 );
+        config.setRefreshPersist( false );
+        config.setConsumerInterval( 60 * 1000 );
         agent.setConfig( config );
 
         workDir = new File( System.getProperty( "java.io.tmpdir" ) + "/syncrepl-work" );