You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by el...@apache.org on 2011/07/29 06:55:29 UTC
svn commit: r1152104 - in
/directory/apacheds/trunk/server-integ/src/test/java/org/apache/directory/server/replication:
ClientInitialRefreshIT.java MockSyncReplConsumer.java
Author: elecharny
Date: Fri Jul 29 04:55:28 2011
New Revision: 1152104
URL: http://svn.apache.org/viewvc?rev=1152104&view=rev
Log:
o Added a Mock object for SyncRepl consumer
o Added a test to check that we correctly load 1000 entries added in a producer
Added:
directory/apacheds/trunk/server-integ/src/test/java/org/apache/directory/server/replication/ClientInitialRefreshIT.java
directory/apacheds/trunk/server-integ/src/test/java/org/apache/directory/server/replication/MockSyncReplConsumer.java
Added: directory/apacheds/trunk/server-integ/src/test/java/org/apache/directory/server/replication/ClientInitialRefreshIT.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/server-integ/src/test/java/org/apache/directory/server/replication/ClientInitialRefreshIT.java?rev=1152104&view=auto
==============================================================================
--- directory/apacheds/trunk/server-integ/src/test/java/org/apache/directory/server/replication/ClientInitialRefreshIT.java (added)
+++ directory/apacheds/trunk/server-integ/src/test/java/org/apache/directory/server/replication/ClientInitialRefreshIT.java Fri Jul 29 04:55:28 2011
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.directory.server.replication;
+
+
+import static org.junit.Assert.assertTrue;
+
+import java.lang.reflect.Method;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.directory.server.annotations.CreateLdapServer;
+import org.apache.directory.server.annotations.CreateTransport;
+import org.apache.directory.server.core.CoreSession;
+import org.apache.directory.server.core.DirectoryService;
+import org.apache.directory.server.core.annotations.ContextEntry;
+import org.apache.directory.server.core.annotations.CreateDS;
+import org.apache.directory.server.core.annotations.CreateIndex;
+import org.apache.directory.server.core.annotations.CreatePartition;
+import org.apache.directory.server.core.factory.DSAnnotationProcessor;
+import org.apache.directory.server.core.integ.FrameworkRunner;
+import org.apache.directory.server.factory.ServerAnnotationProcessor;
+import org.apache.directory.server.ldap.LdapServer;
+import org.apache.directory.server.ldap.replication.SyncReplRequestHandler;
+import org.apache.directory.server.ldap.replication.SyncreplConfiguration;
+import org.apache.directory.shared.ldap.model.entry.DefaultEntry;
+import org.apache.directory.shared.ldap.model.entry.Entry;
+import org.apache.directory.shared.ldap.model.message.SearchRequest;
+import org.apache.directory.shared.ldap.model.message.SearchRequestImpl;
+import org.apache.directory.shared.ldap.model.name.Dn;
+import org.apache.directory.shared.ldap.model.schema.SchemaManager;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests the initial refresh of a client
+ *
+ * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
+ */
+public class ClientInitialRefreshIT
+{
+ private static LdapServer providerServer;
+
+ private static SchemaManager schemaManager;
+
+ private static CoreSession providerSession;
+
+ private static AtomicInteger entryCount = new AtomicInteger();
+
+ @BeforeClass
+ public static void setUp() throws Exception
+ {
+ Class<?> justLoadToSetControlProperties = Class.forName( FrameworkRunner.class.getName() );
+
+ startProvider();
+
+ // Load 1000 entries
+ for ( int i = 0; i < 1000; i++ )
+ {
+ Entry entry = createEntry();
+
+ providerSession.add( entry );
+ }
+ }
+
+
+ @AfterClass
+ public static void tearDown()
+ {
+ providerServer.stop();
+ }
+
+
+ /**
+ * Check that the entry exists in the target server. We wait up to 10 seconds, by
+ * 100ms steps, until either the entry s found, or we have exhausted the 10 seconds delay.
+ */
+ private boolean checkEntryExistence( CoreSession session, Dn entryDn ) throws Exception
+ {
+ boolean replicated = false;
+
+ for ( int i = 0; i < 100; i++ )
+ {
+ Thread.sleep( 100 );
+
+ if ( session.exists( entryDn ) )
+ {
+ replicated = true;
+ break;
+ }
+ }
+
+ return replicated;
+ }
+
+
+ private void waitAndCompareEntries( Dn dn ) throws Exception
+ {
+ // sleep for 2 sec (twice the refresh interval), just to let the first refresh request succeed
+ Entry providerEntry = providerSession.lookup( dn, "*", "+" );
+
+ //Entry consumerEntry = consumerSession.lookup( dn, "*", "+" );
+ //assertEquals( providerEntry, consumerEntry );
+ }
+
+
+ private static Entry createEntry() throws Exception
+ {
+ String user = "user"+ entryCount.incrementAndGet();
+
+ String dn = "cn=" + user + ",dc=example,dc=com";
+
+ DefaultEntry entry = new DefaultEntry( schemaManager, dn,
+ "objectClass", "person",
+ "cn", user,
+ "sn", user );
+
+ return entry;
+ }
+
+
+ @CreateDS(allowAnonAccess = true, name = "provider-replication", partitions =
+ {
+ @CreatePartition(
+ name = "example",
+ suffix = "dc=example,dc=com",
+ indexes =
+ {
+ @CreateIndex(attribute = "objectClass"),
+ @CreateIndex(attribute = "dc"),
+ @CreateIndex(attribute = "ou")
+ },
+ contextEntry=@ContextEntry( entryLdif =
+ "dn: dc=example,dc=com\n" +
+ "objectClass: domain\n" +
+ "dc: example" ) )
+ })
+ @CreateLdapServer(transports =
+ { @CreateTransport( port=16000, protocol = "LDAP") })
+ private static void startProvider() throws Exception
+ {
+ Method createProviderMethod = ClientInitialRefreshIT.class.getDeclaredMethod( "startProvider" );
+ CreateDS dsAnnotation = createProviderMethod.getAnnotation( CreateDS.class );
+ DirectoryService provDirService = DSAnnotationProcessor.createDS( dsAnnotation );
+
+ CreateLdapServer serverAnnotation = createProviderMethod.getAnnotation( CreateLdapServer.class );
+
+ providerServer = ServerAnnotationProcessor.instantiateLdapServer( serverAnnotation, provDirService, 0 );
+
+ providerServer.setReplicationReqHandler( new SyncReplRequestHandler() );
+
+ Runnable r = new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ providerServer.start();
+ schemaManager = providerServer.getDirectoryService().getSchemaManager();
+ providerSession = providerServer.getDirectoryService().getAdminSession();
+ }
+ catch( Exception e )
+ {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ Thread t = new Thread( r );
+ t.setDaemon( true );
+ t.start();
+ t.join();
+ }
+
+
+ @Test
+ public void testInitialRefresh() throws Exception
+ {
+ final MockSyncReplConsumer syncreplClient = new MockSyncReplConsumer();
+ final SyncreplConfiguration config = new SyncreplConfiguration();
+ config.setRemoteHost( "localhost" );
+ config.setRemotePort( 16000 );
+ config.setReplUserDn( "uid=admin,ou=system" );
+ config.setReplUserPassword( "secret".getBytes() );
+ config.setUseTls( false );
+ config.setBaseDn( "dc=example,dc=com" );
+ config.setRefreshInterval( 1000 );
+
+ syncreplClient.setConfig( config );
+
+ assertTrue( true );
+
+ Runnable consumerTask = new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ String baseDn = config.getBaseDn();
+
+ SearchRequest searchRequest = new SearchRequestImpl();
+
+ searchRequest.setBase( new Dn( baseDn ) );
+ searchRequest.setFilter( config.getFilter() );
+ searchRequest.setSizeLimit( config.getSearchSizeLimit() );
+ searchRequest.setTimeLimit( config.getSearchTimeout() );
+
+ searchRequest.setDerefAliases( config.getAliasDerefMode() );
+ searchRequest.setScope( config.getSearchScope() );
+ searchRequest.setTypesOnly( false );
+
+ searchRequest.addAttributes( config.getAttributes() );
+
+ syncreplClient.init( schemaManager );
+ syncreplClient.start();
+ }
+ catch ( Exception e )
+ {
+ throw new RuntimeException( e );
+ }
+ }
+ };
+
+ Thread consumerThread = new Thread( consumerTask );
+ consumerThread.setDaemon( true );
+ consumerThread.start();
+
+ // We should have 1000 entries plus the base entry = 1001
+ assertTrue( waitForSyncReplClient( syncreplClient, 1001 ) );
+ }
+
+
+ /**
+ * Wait for the expected number of entries to be added into the client
+ */
+ private boolean waitForSyncReplClient( MockSyncReplConsumer consumer, int expected ) throws Exception
+ {
+ System.out.println( "NbAdded every 100ms : " );
+ boolean isFirst = true;
+
+ for ( int i = 0; i < 50; i++ )
+ {
+ Thread.sleep( 100 );
+
+ int nbAdded = consumer.getNbAdded();
+
+ if ( isFirst )
+ {
+ isFirst = false;
+ }
+ else
+ {
+ System.out.print( ", " );
+ }
+
+ System.out.print( nbAdded );
+
+ if ( nbAdded == expected )
+ {
+ return true;
+ }
+ }
+
+ return false;
+ }
+}
Added: directory/apacheds/trunk/server-integ/src/test/java/org/apache/directory/server/replication/MockSyncReplConsumer.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/server-integ/src/test/java/org/apache/directory/server/replication/MockSyncReplConsumer.java?rev=1152104&view=auto
==============================================================================
--- directory/apacheds/trunk/server-integ/src/test/java/org/apache/directory/server/replication/MockSyncReplConsumer.java (added)
+++ directory/apacheds/trunk/server-integ/src/test/java/org/apache/directory/server/replication/MockSyncReplConsumer.java Fri Jul 29 04:55:28 2011
@@ -0,0 +1,1046 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.directory.server.replication;
+
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.directory.ldap.client.api.ConnectionClosedEventListener;
+import org.apache.directory.ldap.client.api.LdapNetworkConnection;
+import org.apache.directory.ldap.client.api.future.SearchFuture;
+import org.apache.directory.server.core.filtering.EntryFilteringCursor;
+import org.apache.directory.server.ldap.replication.ReplicationConsumerConfig;
+import org.apache.directory.server.ldap.replication.SyncreplConfiguration;
+import org.apache.directory.shared.ldap.codec.api.LdapApiService;
+import org.apache.directory.shared.ldap.codec.api.LdapApiServiceFactory;
+import org.apache.directory.shared.ldap.extras.controls.SyncDoneValue;
+import org.apache.directory.shared.ldap.extras.controls.SyncInfoValue;
+import org.apache.directory.shared.ldap.extras.controls.SyncModifyDn;
+import org.apache.directory.shared.ldap.extras.controls.SyncModifyDnType;
+import org.apache.directory.shared.ldap.extras.controls.SyncStateTypeEnum;
+import org.apache.directory.shared.ldap.extras.controls.SyncStateValue;
+import org.apache.directory.shared.ldap.extras.controls.SynchronizationModeEnum;
+import org.apache.directory.shared.ldap.extras.controls.syncrepl_impl.SyncInfoValueDecorator;
+import org.apache.directory.shared.ldap.extras.controls.syncrepl_impl.SyncRequestValueDecorator;
+import org.apache.directory.shared.ldap.model.constants.SchemaConstants;
+import org.apache.directory.shared.ldap.model.entry.Attribute;
+import org.apache.directory.shared.ldap.model.entry.DefaultAttribute;
+import org.apache.directory.shared.ldap.model.entry.DefaultModification;
+import org.apache.directory.shared.ldap.model.entry.Entry;
+import org.apache.directory.shared.ldap.model.entry.Modification;
+import org.apache.directory.shared.ldap.model.entry.ModificationOperation;
+import org.apache.directory.shared.ldap.model.exception.LdapException;
+import org.apache.directory.shared.ldap.model.filter.AndNode;
+import org.apache.directory.shared.ldap.model.filter.EqualityNode;
+import org.apache.directory.shared.ldap.model.filter.ExprNode;
+import org.apache.directory.shared.ldap.model.filter.NotNode;
+import org.apache.directory.shared.ldap.model.filter.OrNode;
+import org.apache.directory.shared.ldap.model.filter.PresenceNode;
+import org.apache.directory.shared.ldap.model.message.IntermediateResponse;
+import org.apache.directory.shared.ldap.model.message.Response;
+import org.apache.directory.shared.ldap.model.message.ResultCodeEnum;
+import org.apache.directory.shared.ldap.model.message.SearchRequest;
+import org.apache.directory.shared.ldap.model.message.SearchRequestImpl;
+import org.apache.directory.shared.ldap.model.message.SearchResultDone;
+import org.apache.directory.shared.ldap.model.message.SearchResultEntry;
+import org.apache.directory.shared.ldap.model.message.SearchResultReference;
+import org.apache.directory.shared.ldap.model.name.Dn;
+import org.apache.directory.shared.ldap.model.name.Rdn;
+import org.apache.directory.shared.ldap.model.schema.AttributeType;
+import org.apache.directory.shared.ldap.model.schema.AttributeTypeOptions;
+import org.apache.directory.shared.ldap.model.schema.SchemaManager;
+import org.apache.directory.shared.util.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ *
+ * Implementation of syncrepl slave a.k.a consumer.
+ *
+ * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
+ */
+public class MockSyncReplConsumer implements ConnectionClosedEventListener
+//, ReplicationConsumer
+{
+ /** the logger */
+ private static final Logger LOG = LoggerFactory.getLogger( MockSyncReplConsumer.class );
+
+ /** The codec */
+ private LdapApiService ldapCodecService = LdapApiServiceFactory.getSingleton();
+
+ /** the syncrepl configuration */
+ private SyncreplConfiguration config;
+
+ /** the sync cookie sent by the server */
+ private byte[] syncCookie;
+
+ /** connection to the syncrepl provider */
+ private LdapNetworkConnection connection;
+
+ /** the search request with control */
+ private SearchRequest searchRequest;
+
+ /** the schema manager */
+ private SchemaManager schemaManager;
+
+ /** the cookie file */
+ private File cookieFile;
+
+ /** flag to indicate whether the consumer was disconnected */
+ private boolean disconnected;
+
+ /** The number of added entries */
+ private AtomicInteger nbAdded = new AtomicInteger(0);
+
+ /** attributes on which modification should be ignored */
+ private static final String[] MOD_IGNORE_AT = new String[]
+ {
+ SchemaConstants.ENTRY_UUID_AT,
+ SchemaConstants.ENTRY_CSN_AT,
+ SchemaConstants.MODIFIERS_NAME_AT,
+ SchemaConstants.MODIFY_TIMESTAMP_AT,
+ SchemaConstants.CREATE_TIMESTAMP_AT,
+ SchemaConstants.CREATORS_NAME_AT,
+ SchemaConstants.ENTRY_PARENT_ID_AT
+ };
+
+ /** A thread used to refresh in refreshOnly mode */
+ private RefresherThread refreshThread;
+
+ /** the cookie that was saved last time */
+ private byte[] lastSavedCookie;
+
+ /** The (entrtyUuid=*) filter */
+ private static final PresenceNode ENTRY_UUID_PRESENCE_FILTER = new PresenceNode( SchemaConstants.ENTRY_UUID_AT );
+
+ /** The set used for search attributes, containing only the entryUuid AT */
+ private static final Set<AttributeTypeOptions> ENTRY_UUID_ATOP_SET = new HashSet<AttributeTypeOptions>();
+
+ private List<Modification> cookieModLst;
+
+ /** AttributeTypes used for replication */
+ private static AttributeType COOKIE_AT_TYPE;
+ private static AttributeType ENTRY_UUID_AT;
+
+
+ /**
+ * @return the config
+ */
+ public SyncreplConfiguration getConfig()
+ {
+ return config;
+ }
+
+
+ /**
+ * Init the replication service
+ * @param directoryservice The directory service
+ */
+ public void init( SchemaManager schemaManager ) throws Exception
+ {
+ this.schemaManager = schemaManager;
+ ENTRY_UUID_AT = schemaManager.lookupAttributeTypeRegistry( SchemaConstants.ENTRY_UUID_AT );
+ COOKIE_AT_TYPE = schemaManager.lookupAttributeTypeRegistry( SchemaConstants.ADS_REPL_COOKIE );
+
+ ENTRY_UUID_ATOP_SET.add( new AttributeTypeOptions( ENTRY_UUID_AT ) );
+
+ Attribute cookieAttr = new DefaultAttribute( COOKIE_AT_TYPE );
+
+ Modification cookieMod = new DefaultModification( ModificationOperation.REPLACE_ATTRIBUTE, cookieAttr );
+ cookieModLst = new ArrayList<Modification>( 1 );
+ cookieModLst.add( cookieMod );
+
+ File cookieDir = File.createTempFile( "cookies", "" );
+ cookieDir.deleteOnExit();
+ cookieFile = new File( cookieDir, String.valueOf( config.getReplicaId() ) );
+
+ prepareSyncSearchRequest();
+ }
+
+
+ /**
+ * Connect to the remote server. Note that a SyncRepl consumer will be connected to only
+ * one remote server
+ *
+ * @return true if the connections have been successful.
+ */
+ public boolean connect()
+ {
+ try
+ {
+ String providerHost = config.getRemoteHost();
+ int port = config.getRemotePort();
+
+ // Create a connection
+ if ( connection == null )
+ {
+ connection = new LdapNetworkConnection( providerHost, port );
+ connection.setTimeOut( -1L );
+
+ if( config.isUseTls() )
+ {
+ connection.getConfig().setTrustManagers( config.getTrustManager() );
+ connection.startTls();
+ }
+
+ connection.addConnectionClosedEventListener( this );
+ }
+
+ // Do a bind
+ try
+ {
+ connection.bind( config.getReplUserDn(), Strings.utf8ToString( config.getReplUserPassword() ) );
+ return true;
+ }
+ catch ( LdapException le )
+ {
+ LOG.warn( "Failed to bind to the server with the given bind Dn {}", config.getReplUserDn() );
+ LOG.warn( "", le );
+ }
+ }
+ catch ( Exception e )
+ {
+ LOG.error( "Failed to bind with the given bindDN and credentials", e );
+ }
+
+ return false;
+ }
+
+
+ /**
+ *
+ * prepares a SearchRequest for syncing DIT content.
+ *
+ */
+ public void prepareSyncSearchRequest() throws LdapException
+ {
+ String baseDn = config.getBaseDn();
+
+ searchRequest = new SearchRequestImpl();
+
+ searchRequest.setBase( new Dn( baseDn ) );
+ searchRequest.setFilter( config.getFilter() );
+ searchRequest.setSizeLimit( config.getSearchSizeLimit() );
+ searchRequest.setTimeLimit( config.getSearchTimeout() );
+
+ searchRequest.setDerefAliases( config.getAliasDerefMode() );
+ searchRequest.setScope( config.getSearchScope() );
+ searchRequest.setTypesOnly( false );
+
+ searchRequest.addAttributes( config.getAttributes() );
+ }
+
+
+ public ResultCodeEnum handleSearchDone( SearchResultDone searchDone )
+ {
+ LOG.debug( "///////////////// handleSearchDone //////////////////" );
+
+ SyncDoneValue ctrl = (SyncDoneValue)searchDone.getControls().get( SyncDoneValue.OID );
+
+ if ( ( ctrl != null ) && ( ctrl.getCookie() != null ) )
+ {
+ syncCookie = ctrl.getCookie();
+ LOG.debug( "assigning cookie from sync done value control: " + Strings.utf8ToString(syncCookie) );
+ storeCookie();
+ }
+
+ LOG.debug( "//////////////// END handleSearchDone//////////////////////" );
+
+ return searchDone.getLdapResult().getResultCode();
+ }
+
+
+ public void handleSearchReference( SearchResultReference searchRef )
+ {
+ // this method won't be called cause the provider will serve the referrals as
+ // normal entry objects due to the usage of ManageDsaITControl in the search request
+ }
+
+
+ public void handleSearchResult( SearchResultEntry syncResult )
+ {
+ LOG.debug( "------------- starting handleSearchResult ------------" );
+
+ SyncStateValue syncStateCtrl = ( SyncStateValue ) syncResult.getControl( SyncStateValue.OID );
+
+ try
+ {
+ Entry remoteEntry = syncResult.getEntry();
+
+ if ( syncStateCtrl.getCookie() != null )
+ {
+ syncCookie = syncStateCtrl.getCookie();
+ LOG.debug( "assigning the cookie from sync state value control: "
+ + Strings.utf8ToString(syncCookie) );
+ }
+
+ SyncStateTypeEnum state = syncStateCtrl.getSyncStateType();
+
+ LOG.debug( "state name {}", state.name() );
+
+ // check to avoid conversion of UUID from byte[] to String
+ if ( LOG.isDebugEnabled() )
+ {
+ LOG.debug( "entryUUID = {}", Strings.uuidToString(syncStateCtrl.getEntryUUID()) );
+ }
+
+ switch ( state )
+ {
+ case ADD:
+ //System.out.println( "Entry added : " + remoteEntry.getDn() );
+ nbAdded.getAndIncrement();
+ break;
+
+ case MODIFY:
+ LOG.debug( "modifying entry with dn {}", remoteEntry.getDn().getName() );
+ modify( remoteEntry );
+ break;
+
+ case MODDN:
+ SyncModifyDn adsModDnControl = ( SyncModifyDn ) syncResult.getControls().get( SyncModifyDn.OID );
+ //Apache Directory Server's special control
+ applyModDnOperation( adsModDnControl );
+ break;
+
+ case DELETE:
+ LOG.debug( "deleting entry with dn {}", remoteEntry.getDn().getName() );
+ // incase of a MODDN operation resulting in a branch to be moved out of scope
+ // ApacheDS replication provider sends a single delete event on the Dn of the moved branch
+ // so the branch needs to be recursively deleted here
+ deleteRecursive( remoteEntry.getDn(), null );
+ break;
+
+ case PRESENT:
+ LOG.debug( "entry present {}", remoteEntry );
+ break;
+ }
+
+ // store the cookie only if the above operation was successful
+ if ( syncStateCtrl.getCookie() != null )
+ {
+ storeCookie();
+ }
+ }
+ catch ( Exception e )
+ {
+ LOG.error( e.getMessage(), e );
+ }
+
+ LOG.debug( "------------- Ending handleSearchResult ------------" );
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public void handleSyncInfo( IntermediateResponse syncInfoResp )
+ {
+ try
+ {
+ LOG.debug( "............... inside handleSyncInfo ..............." );
+
+ SyncInfoValueDecorator decorator = new SyncInfoValueDecorator( ldapCodecService );
+ byte[] syncinfo = syncInfoResp.getResponseValue();
+ decorator.setValue( syncinfo );
+ SyncInfoValue syncInfoValue = decorator.getDecorated();
+
+ byte[] cookie = syncInfoValue.getCookie();
+
+ if ( cookie != null )
+ {
+ LOG.debug( "setting the cookie from the sync info: " + Strings.utf8ToString(cookie) );
+ syncCookie = cookie;
+ }
+
+ LOG.info( "refreshDeletes: " + syncInfoValue.isRefreshDeletes() );
+
+ List<byte[]> uuidList = syncInfoValue.getSyncUUIDs();
+ // if refreshDeletes set to true then delete all the entries with entryUUID
+ // present in the syncIdSet
+ if ( syncInfoValue.isRefreshDeletes() )
+ {
+ deleteEntries( uuidList, false );
+ }
+ else
+ {
+ deleteEntries( uuidList, true );
+ }
+
+ LOG.info( "refreshDone: " + syncInfoValue.isRefreshDone() );
+
+ storeCookie();
+ }
+ catch ( Exception de )
+ {
+ LOG.error( "Failed to handle syncinfo message" );
+ de.printStackTrace();
+ }
+
+ LOG.debug( ".................... END handleSyncInfo ..............." );
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public void connectionClosed()
+ {
+ if ( disconnected )
+ {
+ return;
+ }
+
+ boolean connected = false;
+
+ while ( !connected )
+ {
+ try
+ {
+ Thread.sleep( config.getRefreshInterval() );
+ }
+ catch ( InterruptedException e )
+ {
+ LOG.error( "Interrupted while sleeping before trying to reconnect", e );
+ }
+
+ LOG.debug( "Trying to reconnect" );
+ connected = connect();
+ }
+
+ startSync();
+ }
+
+
+ /**
+ * starts the synchronization operation
+ */
+ public void startSync()
+ {
+ // read the cookie if persisted
+ readCookie();
+
+ if ( config.isRefreshNPersist() )
+ {
+ try
+ {
+ LOG.debug( "==================== Refresh And Persist ==========" );
+ doSyncSearch( SynchronizationModeEnum.REFRESH_AND_PERSIST, false );
+ }
+ catch ( Exception e )
+ {
+ LOG.error( "Failed to sync with refreshAndPersist mode", e );
+ }
+ }
+ else
+ {
+ refreshThread = new RefresherThread();
+ refreshThread.start();
+ }
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public void setConfig( ReplicationConsumerConfig config )
+ {
+ this.config = ( SyncreplConfiguration ) config;
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public void start()
+ {
+ connect();
+ startSync();
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public void stop()
+ {
+ disconnet();
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public String getId()
+ {
+ return String.valueOf( getConfig().getReplicaId() );
+ }
+
+
+ /**
+ * performs a search on connection with updated syncRequest control.
+ *
+ * @throws Exception in case of any problems encountered while searching
+ */
+ private void doSyncSearch( SynchronizationModeEnum syncType, boolean reloadHint ) throws Exception
+ {
+ SyncRequestValueDecorator syncReq = new SyncRequestValueDecorator( ldapCodecService );
+
+ syncReq.setMode( syncType );
+ syncReq.setReloadHint( reloadHint );
+
+ if ( syncCookie != null )
+ {
+ LOG.debug( "searching with searchRequest, cookie '{}'", Strings.utf8ToString(syncCookie) );
+ syncReq.setCookie( syncCookie );
+ }
+
+ searchRequest.addControl( syncReq );
+
+ // Do the search
+ SearchFuture sf = connection.searchAsync( searchRequest );
+
+ Response resp = sf.get();
+
+ while ( !( resp instanceof SearchResultDone ) && !sf.isCancelled() )
+ {
+ if ( resp instanceof SearchResultEntry )
+ {
+ handleSearchResult( ( SearchResultEntry ) resp );
+ }
+ else if ( resp instanceof SearchResultReference )
+ {
+ handleSearchReference( ( SearchResultReference ) resp );
+ }
+ else if ( resp instanceof IntermediateResponse )
+ {
+ handleSyncInfo( (IntermediateResponse) resp );
+ }
+
+ resp = sf.get();
+ }
+
+ ResultCodeEnum resultCode = handleSearchDone( ( SearchResultDone ) resp );
+
+ LOG.debug( "sync operation returned result code {}", resultCode );
+ if ( resultCode == ResultCodeEnum.NO_SUCH_OBJECT )
+ {
+ // log the error and handle it appropriately
+ LOG.warn( "given replication base Dn {} is not found on provider", config.getBaseDn() );
+ if ( syncType == SynchronizationModeEnum.REFRESH_AND_PERSIST )
+ {
+ LOG.warn( "disconnecting the consumer running in refreshAndPersist mode from the provider" );
+ disconnet();
+ }
+ }
+ else if ( resultCode == ResultCodeEnum.E_SYNC_REFRESH_REQUIRED )
+ {
+ LOG.info( "unable to perform the content synchronization cause E_SYNC_REFRESH_REQUIRED" );
+ try
+ {
+ deleteRecursive( new Dn( config.getBaseDn() ), null );
+ }
+ catch ( Exception e )
+ {
+ LOG
+ .error(
+ "Failed to delete the replica base as part of handling E_SYNC_REFRESH_REQUIRED, disconnecting the consumer",
+ e );
+ disconnet();
+ }
+
+ removeCookie();
+ doSyncSearch( syncType, true );
+ }
+ }
+
+
+ public void disconnet()
+ {
+ disconnected = true;
+
+ try
+ {
+ if ( refreshThread != null )
+ {
+ refreshThread.stopRefreshing();
+ }
+
+ connection.unBind();
+ LOG.info( "Unbound from the server {}", config.getRemoteHost() );
+
+ connection.close();
+ LOG.info( "Connection closed for the server {}", config.getRemoteHost() );
+
+ connection = null;
+
+ // persist the cookie
+ storeCookie();
+
+ // reset the cookie
+ syncCookie = null;
+ }
+ catch ( Exception e )
+ {
+ LOG.error( "Failed to close the connection", e );
+ }
+
+ }
+
+
+ /**
+ * stores the cookie.
+ */
+ private void storeCookie()
+ {
+ if ( syncCookie == null )
+ {
+ return;
+ }
+
+ if ( lastSavedCookie != null && Arrays.equals( syncCookie, lastSavedCookie ) )
+ {
+ return;
+ }
+
+ try
+ {
+ FileOutputStream fout = new FileOutputStream( cookieFile );
+ fout.write( syncCookie.length );
+ fout.write( syncCookie );
+ fout.close();
+
+ lastSavedCookie = new byte[syncCookie.length];
+ System.arraycopy( syncCookie, 0, lastSavedCookie, 0, syncCookie.length );
+
+ LOG.debug( "stored the cookie" );
+ }
+ catch ( Exception e )
+ {
+ LOG.error( "Failed to store the cookie", e );
+ }
+ }
+
+
+ /**
+ * read the cookie
+ */
+ private void readCookie()
+ {
+ try
+ {
+ if ( cookieFile.exists() && ( cookieFile.length() > 0 ) )
+ {
+ FileInputStream fin = new FileInputStream( cookieFile );
+ syncCookie = new byte[fin.read()];
+ fin.read( syncCookie );
+ fin.close();
+
+ lastSavedCookie = new byte[syncCookie.length];
+ System.arraycopy( syncCookie, 0, lastSavedCookie, 0, syncCookie.length );
+
+ LOG.debug( "read the cookie from file: " + Strings.utf8ToString(syncCookie) );
+ }
+ }
+ catch ( Exception e )
+ {
+ LOG.error( "Failed to read the cookie", e );
+ }
+ }
+
+
+ /**
+ * deletes the cookie and resets the syncCookie to null
+ */
+ public void removeCookie()
+ {
+ if ( cookieFile.exists() && ( cookieFile.length() > 0 ) )
+ {
+ boolean deleted = cookieFile.delete();
+ LOG.info( "deleted cookie file {}", deleted );
+ }
+
+ LOG.info( "resetting sync cookie" );
+
+ syncCookie = null;
+ lastSavedCookie = null;
+ }
+
+
+ private void applyModDnOperation( SyncModifyDn modDnControl ) throws Exception
+ {
+ SyncModifyDnType modDnType = modDnControl.getModDnType();
+
+ Dn entryDn = new Dn( modDnControl.getEntryDn() );
+ switch ( modDnType )
+ {
+ case MOVE:
+
+ LOG.debug( "moving {} to the new parent {}", entryDn, modDnControl.getNewSuperiorDn() );
+
+ //session.move( entryDn, new Dn( modDnControl.getNewSuperiorDn() ) );
+ break;
+
+ case RENAME:
+
+ Rdn newRdn = new Rdn( modDnControl.getNewRdn() );
+ boolean deleteOldRdn = modDnControl.isDeleteOldRdn();
+ LOG.debug( "renaming the Dn {} with new Rdn {} and deleteOldRdn flag set to {}", new String[]
+ { entryDn.getName(), newRdn.getName(), String.valueOf( deleteOldRdn ) } );
+
+ //session.rename( entryDn, newRdn, deleteOldRdn );
+ break;
+
+ case MOVEANDRENAME:
+
+ Dn newParentDn = new Dn( modDnControl.getNewSuperiorDn() );
+ newRdn = new Rdn( modDnControl.getNewRdn() );
+ deleteOldRdn = modDnControl.isDeleteOldRdn();
+
+ LOG.debug(
+ "moveAndRename on the Dn {} with new newParent Dn {}, new Rdn {} and deleteOldRdn flag set to {}",
+ new String[]
+ { entryDn.getName(), newParentDn.getName(), newRdn.getName(), String.valueOf( deleteOldRdn ) } );
+
+ //session.moveAndRename( entryDn, newParentDn, newRdn, deleteOldRdn );
+ }
+ }
+
+
+ private void modify( Entry remoteEntry ) throws Exception
+ {
+ /*
+ Entry localEntry = session.lookup( remoteEntry.getDn() );
+
+ remoteEntry.removeAttributes( MOD_IGNORE_AT );
+
+ List<Modification> mods = new ArrayList<Modification>();
+ Iterator<Attribute> itr = localEntry.iterator();
+
+ while ( itr.hasNext() )
+ {
+ Attribute localAttr = itr.next();
+ String attrId = localAttr.getId();
+ Modification mod;
+ Attribute remoteAttr = remoteEntry.get( attrId );
+
+ if ( remoteAttr != null ) // would be better if we compare the values also? or will it consume more time?
+ {
+ mod = new DefaultModification( ModificationOperation.REPLACE_ATTRIBUTE, remoteAttr );
+ remoteEntry.remove( remoteAttr );
+ }
+ else
+ {
+ mod = new DefaultModification( ModificationOperation.REMOVE_ATTRIBUTE, localAttr );
+ }
+
+ mods.add( mod );
+ }
+
+ if ( remoteEntry.size() > 0 )
+ {
+ itr = remoteEntry.iterator();
+ while ( itr.hasNext() )
+ {
+ mods.add( new DefaultModification( ModificationOperation.ADD_ATTRIBUTE, itr.next() ) );
+ }
+ }
+
+ session.modify( remoteEntry.getDn(), mods );
+ */
+ }
+
+
+ /**
+ * deletes the entries having the UUID given in the list
+ *
+ * @param uuidList the list of UUIDs
+ * @throws Exception in case of any problems while deleting the entries
+ */
+ public void deleteEntries( List<byte[]> uuidList, boolean isRefreshPresent ) throws Exception
+ {
+ if ( uuidList == null || uuidList.isEmpty() )
+ {
+ return;
+ }
+
+ for ( byte[] uuid : uuidList )
+ {
+ LOG.info( "uuid: {}", Strings.uuidToString(uuid) );
+ }
+
+ // if it is refreshPresent list then send all the UUIDs for
+ // filtering, otherwise breaking the list will cause the
+ // other present entries to be deleted from DIT
+ if ( isRefreshPresent )
+ {
+ LOG.debug( "refresh present syncinfo list has {} UUIDs", uuidList.size() );
+ _deleteEntries_( uuidList, isRefreshPresent );
+ return;
+ }
+
+ int NODE_LIMIT = 10;
+
+ int count = uuidList.size() / NODE_LIMIT;
+
+ int startIndex = 0;
+ int i = 0;
+ for ( ; i < count; i++ )
+ {
+ startIndex = i * NODE_LIMIT;
+ _deleteEntries_( uuidList.subList( startIndex, startIndex + NODE_LIMIT ), isRefreshPresent );
+ }
+
+ if ( ( uuidList.size() % NODE_LIMIT ) != 0 )
+ {
+ // remove the remaining entries
+ if ( count > 0 )
+ {
+ startIndex = i * NODE_LIMIT;
+ }
+ _deleteEntries_( uuidList.subList( startIndex, uuidList.size() ), isRefreshPresent );
+ }
+ }
+
+
+ /**
+ * do not call this method directly, instead call deleteEntries()
+ *
+ * @param limitedUuidList a list of UUIDs whose size is less than or equal to #NODE_LIMIT (node limit applies only for refreshDeletes list)
+ * @param isRefreshPresent a flag indicating the type of entries present in the UUID list
+ */
+ private void _deleteEntries_( List<byte[]> limitedUuidList, boolean isRefreshPresent ) throws Exception
+ {
+ ExprNode filter = null;
+ int size = limitedUuidList.size();
+ if ( size == 1 )
+ {
+ String uuid = Strings.uuidToString(limitedUuidList.get(0));
+ filter = new EqualityNode<String>( SchemaConstants.ENTRY_UUID_AT,
+ new org.apache.directory.shared.ldap.model.entry.StringValue( uuid ) );
+ if ( isRefreshPresent )
+ {
+ filter = new NotNode( filter );
+ }
+ }
+ else
+ {
+ if ( isRefreshPresent )
+ {
+ filter = new AndNode();
+ }
+ else
+ {
+ filter = new OrNode();
+ }
+
+ for ( int i = 0; i < size; i++ )
+ {
+ String uuid = Strings.uuidToString(limitedUuidList.get(i));
+ ExprNode uuidEqNode = new EqualityNode<String>( SchemaConstants.ENTRY_UUID_AT,
+ new org.apache.directory.shared.ldap.model.entry.StringValue( uuid ) );
+
+ if ( isRefreshPresent )
+ {
+ uuidEqNode = new NotNode( uuidEqNode );
+ ( (AndNode) filter ).addNode( uuidEqNode );
+ }
+ else
+ {
+ ( (OrNode) filter ).addNode( uuidEqNode );
+ }
+ }
+ }
+
+ Dn dn = new Dn( schemaManager, config.getBaseDn() );
+
+ /*
+ LOG.debug( "selecting entries to be deleted using filter {}", filter.toString() );
+ EntryFilteringCursor cursor = session.search( dn, SearchScope.SUBTREE, filter,
+ AliasDerefMode.NEVER_DEREF_ALIASES, ENTRY_UUID_ATOP_SET );
+ cursor.beforeFirst();
+
+ while ( cursor.next() )
+ {
+ Entry entry = cursor.get();
+ deleteRecursive( entry.getDn(), null );
+ }
+
+ cursor.close();
+ */
+ }
+
+ /**
+ * A Thread implementation for synchronizing the DIT in refreshOnly mode
+ */
+ private class RefresherThread extends Thread
+ {
+ /** A field used to tell the thread it should stop */
+ private volatile boolean stop = false;
+
+ /** A mutex used to make the thread sleeping for a moment */
+ private final Object mutex = new Object();
+
+ public RefresherThread()
+ {
+ setDaemon( true );
+ }
+
+
+ public void run()
+ {
+ while ( !stop )
+ {
+ LOG.debug( "==================== Refresh Only ==========" );
+
+ try
+ {
+ doSyncSearch( SynchronizationModeEnum.REFRESH_ONLY, false );
+
+ LOG.info( "--------------------- Sleep for a little while ------------------" );
+ mutex.wait( config.getRefreshInterval() );
+ LOG.debug( "--------------------- syncing again ------------------" );
+
+ }
+ catch ( InterruptedException ie )
+ {
+ LOG.warn( "refresher thread interrupted" );
+ }
+ catch ( Exception e )
+ {
+ LOG.error( "Failed to sync with refresh only mode", e );
+ }
+ }
+ }
+
+
+ public void stopRefreshing()
+ {
+ stop = true;
+
+ // just in case if it is sleeping, wake up the thread
+ mutex.notify();
+ }
+ }
+
+
+ /**
+ * removes all child entries present under the given Dn and finally the Dn itself
+ *
+ * Working:
+ * This is a recursive function which maintains a Map<Dn,Cursor>.
+ * The way the cascade delete works is by checking for children for a
+ * given Dn(i.e opening a search cursor) and if the cursor is empty
+ * then delete the Dn else for each entry's Dn present in cursor call
+ * deleteChildren() with the Dn and the reference to the map.
+ *
+ * The reason for opening a search cursor is based on an assumption
+ * that an entry *might* contain children, consider the below DIT fragment
+ *
+ * parent
+ * / \
+ * child1 child2
+ * / \
+ * grand21 grand22
+ *
+ * The below method works better in the case where the tree depth is >1
+ *
+ * In the case of passing a non-null DeleteListener, the return value will always be null, cause the
+ * operation is treated as asynchronous and response result will be sent using the listener callback
+ *
+ * @param rootDn the Dn which will be removed after removing its children
+ * @param map a map to hold the Cursor related to a Dn
+ * @throws Exception If the Dn is not valid or if the deletion failed
+ */
+ private void deleteRecursive( Dn rootDn, Map<Dn, EntryFilteringCursor> cursorMap ) throws Exception
+ {
+ LOG.debug( "searching for {}", rootDn.getName() );
+ EntryFilteringCursor cursor = null;
+
+ try
+ {
+ if ( cursorMap == null )
+ {
+ cursorMap = new HashMap<Dn, EntryFilteringCursor>();
+ }
+
+ /*
+ cursor = cursorMap.get( rootDn );
+
+ if ( cursor == null )
+ {
+ cursor = session.search( rootDn, SearchScope.ONELEVEL, ENTRY_UUID_PRESENCE_FILTER,
+ AliasDerefMode.NEVER_DEREF_ALIASES, ENTRY_UUID_ATOP_SET );
+ cursor.beforeFirst();
+ LOG.debug( "putting cursor for {}", rootDn.getName() );
+ cursorMap.put( rootDn, cursor );
+ }
+
+ if ( !cursor.next() ) // if this is a leaf entry's Dn
+ {
+ LOG.debug( "deleting {}", rootDn.getName() );
+ cursorMap.remove( rootDn );
+ cursor.close();
+ session.delete( rootDn );
+ }
+ else
+ {
+ do
+ {
+ Entry entry = cursor.get();
+
+ deleteRecursive( entry.getDn(), cursorMap );
+ }
+ while ( cursor.next() );
+
+ cursorMap.remove( rootDn );
+ cursor.close();
+ LOG.debug( "deleting {}", rootDn.getName() );
+ session.delete( rootDn );
+ }
+ */
+ }
+ catch ( Exception e )
+ {
+ String msg = "Failed to delete child entries under the Dn " + rootDn.getName();
+ LOG.error( msg, e );
+ throw e;
+ }
+ }
+
+
+ /**
+ * @return the nbAdded
+ */
+ public int getNbAdded()
+ {
+ return nbAdded.get();
+ }
+}