You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by ka...@apache.org on 2010/01/25 18:02:12 UTC

svn commit: r902874 - in /directory/apacheds/trunk/syncrepl/src/main: java/org/apache/directory/server/syncrepl/SyncReplConsumer.java java/org/apache/directory/server/syncrepl/SyncreplRunnerUI.java resources/log4j.properties

Author: kayyagari
Date: Mon Jan 25 17:02:11 2010
New Revision: 902874

URL: http://svn.apache.org/viewvc?rev=902874&view=rev
Log:
o implemented delete phase in consumer
o added a index on entryUUID AT
o added a logger

Modified:
    directory/apacheds/trunk/syncrepl/src/main/java/org/apache/directory/server/syncrepl/SyncReplConsumer.java
    directory/apacheds/trunk/syncrepl/src/main/java/org/apache/directory/server/syncrepl/SyncreplRunnerUI.java
    directory/apacheds/trunk/syncrepl/src/main/resources/log4j.properties

Modified: directory/apacheds/trunk/syncrepl/src/main/java/org/apache/directory/server/syncrepl/SyncReplConsumer.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/syncrepl/src/main/java/org/apache/directory/server/syncrepl/SyncReplConsumer.java?rev=902874&r1=902873&r2=902874&view=diff
==============================================================================
--- directory/apacheds/trunk/syncrepl/src/main/java/org/apache/directory/server/syncrepl/SyncReplConsumer.java (original)
+++ directory/apacheds/trunk/syncrepl/src/main/java/org/apache/directory/server/syncrepl/SyncReplConsumer.java Mon Jan 25 17:02:11 2010
@@ -24,17 +24,18 @@
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.UUID;
 
 import javax.naming.ldap.Control;
 
 import org.apache.directory.server.core.CoreSession;
 import org.apache.directory.server.core.DirectoryService;
+import org.apache.directory.server.core.entry.ClonedServerEntry;
 import org.apache.directory.server.core.entry.DefaultServerEntry;
 import org.apache.directory.server.core.entry.ServerModification;
-import org.apache.directory.shared.asn1.codec.DecoderException;
+import org.apache.directory.server.core.filtering.EntryFilteringCursor;
 import org.apache.directory.shared.ldap.client.api.LdapConnection;
 import org.apache.directory.shared.ldap.client.api.exception.LdapException;
 import org.apache.directory.shared.ldap.client.api.listeners.IntermediateResponseListener;
@@ -52,10 +53,15 @@
 import org.apache.directory.shared.ldap.codec.controls.replication.syncInfoValue.SyncInfoValueControlDecoder;
 import org.apache.directory.shared.ldap.codec.controls.replication.syncStateValue.SyncStateValueControlCodec;
 import org.apache.directory.shared.ldap.codec.controls.replication.syncStateValue.SyncStateValueControlDecoder;
+import org.apache.directory.shared.ldap.constants.SchemaConstants;
 import org.apache.directory.shared.ldap.entry.Entry;
 import org.apache.directory.shared.ldap.entry.EntryAttribute;
 import org.apache.directory.shared.ldap.entry.Modification;
 import org.apache.directory.shared.ldap.entry.ModificationOperation;
+import org.apache.directory.shared.ldap.entry.client.ClientStringValue;
+import org.apache.directory.shared.ldap.filter.EqualityNode;
+import org.apache.directory.shared.ldap.filter.ExprNode;
+import org.apache.directory.shared.ldap.filter.OrNode;
 import org.apache.directory.shared.ldap.filter.SearchScope;
 import org.apache.directory.shared.ldap.message.AliasDerefMode;
 import org.apache.directory.shared.ldap.message.ResultCodeEnum;
@@ -64,6 +70,7 @@
 import org.apache.directory.shared.ldap.message.control.replication.SyncStateTypeEnum;
 import org.apache.directory.shared.ldap.message.control.replication.SyncStateValueControl;
 import org.apache.directory.shared.ldap.message.control.replication.SynchronizationModeEnum;
+import org.apache.directory.shared.ldap.name.LdapDN;
 import org.apache.directory.shared.ldap.schema.SchemaManager;
 import org.apache.directory.shared.ldap.util.StringTools;
 import org.slf4j.Logger;
@@ -123,7 +130,8 @@
     private SyncStateValueControlDecoder syncStateControlDecoder = new SyncStateValueControlDecoder();
 
     /** attributes on which modification should be ignored */
-    private static final String[] MOD_IGNORE_AT = new String[] { "entryUUID", "entryCSN" }; //{ "1.3.6.1.1.16.4", "1.3.6.1.4.1.4203.666.1.7" };
+    private static final String[] MOD_IGNORE_AT = new String[]
+        { "entryUUID", "entryCSN" }; //{ "1.3.6.1.1.16.4", "1.3.6.1.4.1.4203.666.1.7" };
 
     /** flag to indicate whether the current phase is for deleting entries */
     private boolean refreshDeletes;
@@ -271,6 +279,28 @@
             LOG.debug( "assigning cookie from sync done value control: " + StringTools.utf8ToString( syncCookie ) );
         }
 
+        ResultCodeEnum resultCode = searchDone.getLdapResult().getResultCode();
+
+        if ( resultCode == ResultCodeEnum.E_SYNC_REFRESH_REQUIRED )
+        {
+            /*
+                The server may return e-syncRefreshRequired
+                result code on the initial content poll if it is safe to do so when
+                it is unable to perform the operation due to various reasons.
+                reloadHint is set to FALSE in the SearchRequest Message requesting
+                the initial content poll.
+                
+                TODO: Q: The default value is already FALSE then why should this be set to FALSE
+                and how that will help in achieving convergence? should the cookie be reset to null?)
+             */
+            //removeCookie();
+        }
+        else if ( resultCode != ResultCodeEnum.SUCCESS )
+        {
+            // log the error and handle it appropriately
+            LOG.warn( "sync operation was not successful, received result code {}", resultCode );
+        }
+
         LOG.debug( "//////////////// END handleSearchDone//////////////////////" );
     }
 
@@ -290,16 +320,6 @@
         {
             Entry remoteEntry = syncResult.getEntry();
 
-            // for refreshOnly
-            if ( !config.isRefreshPersist() )
-            {
-                if ( !refreshDeletes )
-                {
-                    LOG.info( "the number of attributes present in the entry {} during present phase {}", remoteEntry
-                        .getDn().getName(), remoteEntry.size() );
-                }
-            }
-
             Control ctrl = syncResult.getControl( SyncStateValueControl.CONTROL_OID );
             SyncStateValueControlCodec syncStateCtrl = null;
 
@@ -323,7 +343,11 @@
 
             LOG.debug( "state name {}", state.name() );
 
-            LOG.debug( "entryUUID = {}", StringTools.uuidToString( syncStateCtrl.getEntryUUID() ) );
+            // check to avoid conversion of UUID from byte[] to String
+            if ( LOG.isDebugEnabled() )
+            {
+                LOG.debug( "entryUUID = {}", StringTools.uuidToString( syncStateCtrl.getEntryUUID() ) );
+            }
 
             switch ( state )
             {
@@ -334,10 +358,17 @@
                         LOG.debug( remoteEntry.toString() );
                         session.add( new DefaultServerEntry( schemaManager, remoteEntry ) );
                     }
+                    // in refreshOnly mode the modified entry will be sent with state ADD
+                    else if( !config.isRefreshPersist() )
+                    {
+                        LOG.debug( "updating entry in refreshOnly mode {}", remoteEntry.getDn().getName() );
+                        modify( remoteEntry );
+                    }
 
                     break;
 
                 case MODIFY:
+                    LOG.debug( "modifying entry with dn {}", remoteEntry.getDn().getName() );
                     modify( remoteEntry );
                     break;
 
@@ -345,6 +376,10 @@
                     LOG.debug( "deleting entry with dn {}", remoteEntry.getDn().getName() );
                     session.delete( remoteEntry.getDn() );
                     break;
+
+                case PRESENT:
+                    LOG.debug( "entry present {}", remoteEntry );
+                    break;
             }
         }
         catch ( Exception e )
@@ -382,7 +417,7 @@
             {
                 for ( byte[] uuid : uuidList )
                 {
-                    LOG.info( "uuid: {}", StringTools.utf8ToString( uuid ) );
+                    LOG.info( "uuid: {}", StringTools.uuidToString( uuid ) );
                 }
             }
 
@@ -393,17 +428,12 @@
             // present in the syncIdSet 
             if ( syncInfoValue.isRefreshDeletes() && ( uuidList != null ) )
             {
-                for ( byte[] uuid : uuidList )
-                {
-                    // TODO similar to delete based on DN there should be 
-                    // a method to delete an Entry based on entryUUID
-                    LOG.debug( "FIXME deleting the entry with entryUUID: {}", UUID.nameUUIDFromBytes( uuid ) );
-                }
+                deleteEntries( uuidList );
             }
 
             LOG.info( "refreshDone: " + syncInfoValue.isRefreshDone() );
         }
-        catch ( DecoderException de )
+        catch ( Exception de )
         {
             LOG.error( "Failed to handle syncinfo message" );
             de.printStackTrace();
@@ -513,8 +543,12 @@
             LOG.info( "Connection closed for the server {}", config.getProviderHost() );
 
             connection = null;
+            
             // persist the cookie
             storeCookie();
+            
+            // reset the cookie
+            syncCookie = null;
         }
         catch ( Exception e )
         {
@@ -575,6 +609,24 @@
 
 
     /**
+     * deletes the cookie file if it exists and is not empty
+     * and resets the syncCookie to null
+     */
+    private void removeCookie()
+    {
+        if ( cookieFile.exists() && ( cookieFile.length() > 0 ) )
+        {
+            boolean deleted = cookieFile.delete();
+            LOG.info( "deleted cookie file {}", deleted );
+        }
+
+        LOG.info( "resetting sync cookie" );
+
+        syncCookie = null;
+    }
+
+
+    /**
      * deletes the cookie file(if exists) 
      */
     public void deleteCookieFile()
@@ -589,8 +641,6 @@
 
     private void modify( Entry remoteEntry ) throws Exception
     {
-        LOG.debug( "modifying entry with dn {}", remoteEntry.getDn().getName() );
-
         Entry localEntry = session.lookup( remoteEntry.getDn() );
 
         remoteEntry.removeAttributes( MOD_IGNORE_AT );
@@ -630,6 +680,84 @@
         session.modify( remoteEntry.getDn(), mods );
     }
 
+
+    /**
+     * deletes the entries having the UUID given in the list
+     * 
+     * @param uuidList the list of UUIDs 
+     * @throws Exception in case of any problems while deleting the entries
+     */
+    public void deleteEntries( List<byte[]> uuidList ) throws Exception
+    {
+        if ( uuidList == null || uuidList.isEmpty() )
+        {
+            return;
+        }
+
+        int NODE_LIMIT = 10;
+
+        int count = uuidList.size() / NODE_LIMIT;
+
+        int startIndex = 0;
+        int i = 0;
+        for ( ; i < count; i++ )
+        {
+            startIndex = i* NODE_LIMIT;
+            _deleteEntries_( uuidList.subList( startIndex, startIndex + NODE_LIMIT ) );
+        }
+
+        if( ( uuidList.size() % NODE_LIMIT ) != 0 )
+        {
+            // remove the remaining entries
+            if( count > 0 )
+            {
+                startIndex = i* NODE_LIMIT;
+            }
+            _deleteEntries_( uuidList.subList( startIndex, uuidList.size() ) );
+        }
+    }
+    
+    
+    /**
+     * do not call this method directly, instead call deleteEntries()
+     *
+     * @param limitedUuidList a list of UUIDs whose size is less than or equal to #NODE_LIMIT
+     */
+    private void _deleteEntries_( List<byte[]> limitedUuidList ) throws Exception
+    {
+        ExprNode filter = null;
+        int size = limitedUuidList.size();
+        if( size == 1 )
+        {
+            String uuid = StringTools.uuidToString( limitedUuidList.get( 0 ) );
+            filter = new EqualityNode<String>( SchemaConstants.ENTRY_UUID_AT, new ClientStringValue( uuid ) );
+        }
+        else
+        {
+            filter = new OrNode();
+            for ( int i=0; i < size; i++ )
+            {
+                String uuid = StringTools.uuidToString( limitedUuidList.get( i ) );
+                EqualityNode<String> uuidEqNode = new EqualityNode<String>( SchemaConstants.ENTRY_UUID_AT, new ClientStringValue( uuid ) );
+                ( ( OrNode ) filter ).addNode( uuidEqNode );
+            }
+        }
+
+        LdapDN dn = new LdapDN( config.getBaseDn() );
+        dn.normalize( schemaManager.getNormalizerMapping() );
+
+        EntryFilteringCursor cursor = session.search( dn, SearchScope.SUBTREE, filter, AliasDerefMode.NEVER_DEREF_ALIASES, new HashSet() );
+    
+        while( cursor.next() )
+        {
+            ClonedServerEntry entry = cursor.get();
+            session.delete( entry.getDn(), true );
+        }
+        
+        cursor.close();
+    }
+    
+    
     /**
      * A Thread implementation for synchronizing the DIT in refreshOnly mode
      */

Modified: directory/apacheds/trunk/syncrepl/src/main/java/org/apache/directory/server/syncrepl/SyncreplRunnerUI.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/syncrepl/src/main/java/org/apache/directory/server/syncrepl/SyncreplRunnerUI.java?rev=902874&r1=902873&r2=902874&view=diff
==============================================================================
--- directory/apacheds/trunk/syncrepl/src/main/java/org/apache/directory/server/syncrepl/SyncreplRunnerUI.java (original)
+++ directory/apacheds/trunk/syncrepl/src/main/java/org/apache/directory/server/syncrepl/SyncreplRunnerUI.java Mon Jan 25 17:02:11 2010
@@ -224,6 +224,12 @@
             partition.setPartitionDir( new File( workDir, partition.getId() ) );
             partition.setSyncOnWrite( true );
             partition.setSchemaManager( dirService.getSchemaManager() );
+            
+            // Add objectClass attribute for the system partition
+            Set<Index<?, ServerEntry>> indexedAttrs = new HashSet<Index<?, ServerEntry>>();
+            indexedAttrs.add( new JdbmIndex<Object, ServerEntry>( SchemaConstants.ENTRY_UUID_AT ) );
+            ( ( JdbmPartition ) partition ).setIndexedAttributes( indexedAttrs );
+
             partition.initialize();
 
             dirService.addPartition( partition );

Modified: directory/apacheds/trunk/syncrepl/src/main/resources/log4j.properties
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/syncrepl/src/main/resources/log4j.properties?rev=902874&r1=902873&r2=902874&view=diff
==============================================================================
--- directory/apacheds/trunk/syncrepl/src/main/resources/log4j.properties (original)
+++ directory/apacheds/trunk/syncrepl/src/main/resources/log4j.properties Mon Jan 25 17:02:11 2010
@@ -24,6 +24,7 @@
 log4j.logger.org.apache.directory.server.core=ERROR
 log4j.logger.JdbmIndex=ERROR
 log4j.logger.JdbmTable=ERROR
+log4j.logger.LOG_CHANGES=ERROR
 log4j.logger.org.apache.directory.server.core=ERROR
 log4j.logger.org.apache.directory.shared=ERROR
 log4j.logger.org.apache.directory.shared.ldap.schema.registries=ERROR