You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by ka...@apache.org on 2010/01/20 13:19:33 UTC
svn commit: r901154 - in
/directory/apacheds/trunk/syncrepl/src/main/java/org/apache/directory/server/syncrepl:
SyncReplConsumer.java SyncreplRunnerUI.java
Author: kayyagari
Date: Wed Jan 20 12:19:32 2010
New Revision: 901154
URL: http://svn.apache.org/viewvc?rev=901154&view=rev
Log:
o started handling refreshOnly mode of synchronization
o lots of code reorg and formatting
Modified:
directory/apacheds/trunk/syncrepl/src/main/java/org/apache/directory/server/syncrepl/SyncReplConsumer.java
directory/apacheds/trunk/syncrepl/src/main/java/org/apache/directory/server/syncrepl/SyncreplRunnerUI.java
Modified: directory/apacheds/trunk/syncrepl/src/main/java/org/apache/directory/server/syncrepl/SyncReplConsumer.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/syncrepl/src/main/java/org/apache/directory/server/syncrepl/SyncReplConsumer.java?rev=901154&r1=901153&r2=901154&view=diff
==============================================================================
--- directory/apacheds/trunk/syncrepl/src/main/java/org/apache/directory/server/syncrepl/SyncReplConsumer.java (original)
+++ directory/apacheds/trunk/syncrepl/src/main/java/org/apache/directory/server/syncrepl/SyncReplConsumer.java Wed Jan 20 12:19:32 2010
@@ -23,7 +23,6 @@
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -53,7 +52,6 @@
import org.apache.directory.shared.ldap.codec.controls.replication.syncInfoValue.SyncInfoValueControlDecoder;
import org.apache.directory.shared.ldap.codec.controls.replication.syncStateValue.SyncStateValueControlCodec;
import org.apache.directory.shared.ldap.codec.controls.replication.syncStateValue.SyncStateValueControlDecoder;
-import org.apache.directory.shared.ldap.constants.SchemaConstants;
import org.apache.directory.shared.ldap.entry.Entry;
import org.apache.directory.shared.ldap.entry.EntryAttribute;
import org.apache.directory.shared.ldap.entry.Modification;
@@ -107,27 +105,35 @@
/** the schema manager */
private SchemaManager schemaManager;
-
+
/** the decoder for syncinfovalue control */
private SyncInfoValueControlDecoder decoder = new SyncInfoValueControlDecoder();
/** the cookie file */
private File cookieFile;
-
+
/** flag to indicate whether the consumer was diconncted */
private boolean disconnected;
/** the core session */
private CoreSession session;
-
+
private SyncDoneValueControlDecoder syncDoneControlDecoder = new SyncDoneValueControlDecoder();
-
+
private SyncStateValueControlDecoder syncStateControlDecoder = new SyncStateValueControlDecoder();
/** attributes on which modification should be ignored */
- private static final String[] MOD_IGNORE_AT = new String[]{ "entryUUID", "entryCSN"}; //{ "1.3.6.1.1.16.4", "1.3.6.1.4.1.4203.666.1.7" };
-
-
+ private static final String[] MOD_IGNORE_AT = new String[] { "entryUUID", "entryCSN" }; //{ "1.3.6.1.1.16.4", "1.3.6.1.4.1.4203.666.1.7" };
+
+ /** flag to indicate whether the current phase is for deleting entries */
+ private boolean refreshDeletes;
+
+ /** flag set after receiving refreshPresent Sync Info message */
+ private boolean refreshDone;
+
+ private RefresherThread refreshThread;
+
+
/**
* @return the config
*/
@@ -146,27 +152,17 @@
}
- /**
- * A helper method to quickly quit the program
- */
- private static void quit( LdapConnection connection ) throws IOException
- {
- connection.close();
- System.exit( 1 );
- }
-
-
public void init( DirectoryService directoryservice ) throws Exception
{
this.directoryService = directoryservice;
File cookieDir = new File( directoryservice.getWorkingDirectory(), "cookies" );
cookieDir.mkdir();
-
+
cookieFile = new File( cookieDir, String.valueOf( config.getReplicaId() ) );
-
+
session = directoryService.getAdminSession();
-
+
schemaManager = directoryservice.getSchemaManager();
}
@@ -179,7 +175,7 @@
int port = config.getPort();
// Create a connection
- if( connection == null )
+ if ( connection == null )
{
connection = new LdapConnection( providerHost, port );
}
@@ -225,7 +221,7 @@
String baseDn = config.getBaseDn();
searchRequest = new SearchRequest();
-
+
searchRequest.setBaseDn( baseDn );
searchRequest.setFilter( config.getFilter() );
searchRequest.setSizeLimit( config.getSearchSizeLimit() );
@@ -236,16 +232,7 @@
searchRequest.setScope( SearchScope.getSearchScope( config.getSearchScope() ) );
searchRequest.setTypesOnly( false );
- String[] attributes = config.getAttributes();
-
- if ( attributes == null )
- {
- searchRequest.addAttributes( SchemaConstants.ALL_USER_ATTRIBUTES );
- }
- else
- {
- searchRequest.addAttributes( attributes );
- }
+ searchRequest.addAttributes( config.getAttributes() );
syncReq = new SyncRequestValueControl();
@@ -259,33 +246,21 @@
}
syncReq.setReloadHint( false );
-
- Control control = new SyncRequestValueControl();
- ((SyncRequestValueControl)control).setCookie( syncReq.getEncodedValue() );
-
- try
- {
- searchRequest.add( control );
- }
- catch( LdapException e )
- {
- // shouldn't happen
- LOG.error( "Failed to add constrol to the search request", e );
- }
}
public void handleSearchDone( SearchResultDone searchDone )
{
LOG.debug( "///////////////// handleSearchDone //////////////////" );
-
+
Control ctrl = searchDone.getControl( SyncDoneValueControl.CONTROL_OID );
SyncDoneValueControlCodec syncDoneCtrl = null;
try
{
syncDoneCtrl = ( SyncDoneValueControlCodec ) syncDoneControlDecoder.decode( ctrl.getEncodedValue() );
+ refreshDeletes = syncDoneCtrl.isRefreshDeletes();
}
- catch( Exception e )
+ catch ( Exception e )
{
LOG.error( "Failed to decode the syncDoneControlCodec", e );
}
@@ -295,36 +270,6 @@
syncCookie = syncDoneCtrl.getCookie();
LOG.debug( "assigning cookie from sync done value control: " + StringTools.utf8ToString( syncCookie ) );
}
- else
- {
- LOG.info( "cookie in syncdone message is null" );
- }
-
- if ( !config.isRefreshPersist() )
- {
- // Now, switch to refreshAndPresist
- config.setRefreshPersist( true );
- LOG.debug( "Swithing to RefreshAndPersist" );
-
- try
- {
- // the below call is required to send the updated cookie
- // and refresh mode change (i.e to refreshAndPersist stage)
- // cause while the startSync() method sleeps even after the 'sync done'
- // message arrives as part of initial searchRequest with 'refreshOnly' mode.
- // During this sleep time any 'modifications' happened on the server
- // to already fetched entries will be sent as SearchResultEntries with
- // SyncState value as 'ADD' which will conflict with the DNs of initially received entries
-
- // TODO thinking of alternative ways to implement this in case of large DITs
- // where the modification to entry(ies) can happen before the initial sync is done
- doSyncSearch();
- }
- catch( Exception e )
- {
- LOG.error( "Failed to send a search request with RefreshAndPersist mode", e );
- }
- }
LOG.debug( "//////////////// END handleSearchDone//////////////////////" );
}
@@ -345,14 +290,24 @@
{
Entry remoteEntry = syncResult.getEntry();
+ // for refreshOnly
+ if ( !config.isRefreshPersist() )
+ {
+ if ( !refreshDeletes )
+ {
+ LOG.info( "the number of attributes present in the entry {} during present phase {}", remoteEntry
+ .getDn().getName(), remoteEntry.size() );
+ }
+ }
+
Control ctrl = syncResult.getControl( SyncStateValueControl.CONTROL_OID );
SyncStateValueControlCodec syncStateCtrl = null;
-
+
try
{
syncStateCtrl = ( SyncStateValueControlCodec ) syncStateControlDecoder.decode( ctrl.getEncodedValue() );
}
- catch( Exception e )
+ catch ( Exception e )
{
LOG.error( "Failed to decode syncStateControl", e );
}
@@ -372,7 +327,7 @@
switch ( state )
{
- case ADD :
+ case ADD:
if ( !session.exists( remoteEntry.getDn() ) )
{
LOG.debug( "adding entry with dn {}", remoteEntry.getDn().getName() );
@@ -381,12 +336,12 @@
}
break;
-
- case MODIFY :
+
+ case MODIFY:
modify( remoteEntry );
break;
- case DELETE :
+ case DELETE:
LOG.debug( "deleting entry with dn {}", remoteEntry.getDn().getName() );
session.delete( remoteEntry.getDn() );
break;
@@ -403,7 +358,6 @@
/**
* {@inheritDoc}
- * atm does nothinng except examinig and printing the content of syncinfovalue control
*/
public void handleSyncInfo( byte[] syncinfo )
{
@@ -424,26 +378,29 @@
List<byte[]> uuidList = syncInfoValue.getSyncUUIDs();
LOG.info( "refreshDeletes: " + syncInfoValue.isRefreshDeletes() );
- if( uuidList != null )
+ if ( uuidList != null )
{
- for( byte[] uuid : uuidList )
+ for ( byte[] uuid : uuidList )
{
LOG.info( "uuid: {}", StringTools.utf8ToString( uuid ) );
}
}
+ refreshDeletes = syncInfoValue.isRefreshDeletes();
+ refreshDone = syncInfoValue.isRefreshDone();
+
// if refreshDeletes set to true then delete all the entries with entryUUID
// present in the syncIdSet
- if( syncInfoValue.isRefreshDeletes() && ( uuidList != null ) )
+ if ( syncInfoValue.isRefreshDeletes() && ( uuidList != null ) )
{
- for( byte[] uuid : uuidList )
+ for ( byte[] uuid : uuidList )
{
// TODO similar to delete based on DN there should be
// a method to delete an Entry based on entryUUID
LOG.debug( "FIXME deleting the entry with entryUUID: {}", UUID.nameUUIDFromBytes( uuid ) );
}
}
-
+
LOG.info( "refreshDone: " + syncInfoValue.isRefreshDone() );
}
catch ( DecoderException de )
@@ -461,20 +418,20 @@
*/
public void handleSessionClosed()
{
- if( disconnected )
+ if ( disconnected )
{
return;
}
-
+
boolean connected = false;
-
- while( !connected )
+
+ while ( !connected )
{
try
{
Thread.sleep( config.getConsumerInterval() );
}
- catch( InterruptedException e )
+ catch ( InterruptedException e )
{
LOG.error( "Interrupted while sleeping before trying to reconnect", e );
}
@@ -482,105 +439,36 @@
LOG.debug( "Trying to reconnect" );
connected = bind();
}
-
+
startSync();
}
/**
- * starts the syn operation
+ * starts the synchronization operation
*/
public void startSync()
{
// read the cookie if persisted
readCookie();
-
- if( config.isRefreshPersist() )
+
+ if ( config.isRefreshPersist() )
{
try
{
LOG.debug( "==================== Refresh And Persist ==========" );
- doSyncSearch();
+ doSyncSearch( SynchronizationModeEnum.REFRESH_AND_PERSIST );
}
- catch( Exception e )
+ catch ( Exception e )
{
LOG.error( "Failed to sync with refreshAndPersist mode", e );
}
-
- return;
}
-
- // continue till refreshAndPersist mode is not set
- while( !config.isRefreshPersist() )
- {
-
- LOG.debug( "==================== Refresh Only ==========" );
-
- try
- {
- if ( ( syncReq.getCookie() == null ) || ( syncReq.getCookie().length == 0 ) )
- {
- LOG.debug( "First search (no cookie)" );
- }
- else
- {
- LOG.debug( "searching with searchRequest, cookie '{}'", StringTools.utf8ToString( syncReq.getCookie() ) );
- }
-
- doSyncSearch();
-
- LOG.info( "--------------------- Sleep for a little while ------------------" );
- Thread.sleep( config.getConsumerInterval() );
- LOG.debug( "--------------------- syncing again ------------------" );
-
- }
- catch ( Exception e )
- {
- LOG.error( "Failed to sync with refresh only mode", e );
- }
+ else
+ {
+ refreshThread = new RefresherThread();
+ refreshThread.start();
}
-
- LOG.debug( "**************** Done with the initial sync ***************" );
- }
-
-
- //====================== SearchListener methods ====================================
-
-
- /* (non-Javadoc)
- * @see org.apache.directory.shared.ldap.client.api.listeners.SearchListener#entryFound(org.apache.directory.shared.ldap.client.api.LdapConnection, org.apache.directory.shared.ldap.client.api.messages.SearchResultEntry)
- */
- public void entryFound( LdapConnection connection, SearchResultEntry searchResultEntry ) throws LdapException
- {
- handleSearchResult( searchResultEntry );
- }
-
-
- /* (non-Javadoc)
- * @see org.apache.directory.shared.ldap.client.api.listeners.SearchListener#referralFound(org.apache.directory.shared.ldap.client.api.LdapConnection, org.apache.directory.shared.ldap.client.api.messages.SearchResultReference)
- */
- public void referralFound( LdapConnection connection, SearchResultReference searchResultReference )
- throws LdapException
- {
- handleSearchReference( searchResultReference );
- }
-
-
- /* (non-Javadoc)
- * @see org.apache.directory.shared.ldap.client.api.listeners.SearchListener#searchDone(org.apache.directory.shared.ldap.client.api.LdapConnection, org.apache.directory.shared.ldap.client.api.messages.SearchResultDone)
- */
- public void searchDone( LdapConnection connection, SearchResultDone searchResultDone ) throws LdapException
- {
- handleSearchDone( searchResultDone );
- }
-
-
- /* (non-Javadoc)
- * @see org.apache.directory.shared.ldap.client.api.listeners.IntermediateResponseListener#responseReceived(org.apache.directory.shared.ldap.client.api.LdapConnection, org.apache.directory.shared.ldap.client.api.messages.IntermediateResponse)
- */
- public void responseReceived( LdapConnection connection, IntermediateResponse intermediateResponse )
- {
- handleSyncInfo( intermediateResponse.getResponseValue() );
}
@@ -589,37 +477,35 @@
*
* @throws Exception in case of any problems encountered while searching
*/
- private void doSyncSearch() throws Exception
+ private void doSyncSearch( SynchronizationModeEnum syncType ) throws Exception
{
SyncRequestValueControl syncReq = new SyncRequestValueControl();
-
- if( config.isRefreshPersist() )
- {
- syncReq.setMode( SynchronizationModeEnum.REFRESH_AND_PERSIST );
- }
+ syncReq.setMode( syncType );
if ( syncCookie != null )
{
+ LOG.debug( "searching with searchRequest, cookie '{}'", StringTools.utf8ToString( syncCookie ) );
syncReq.setCookie( syncCookie );
}
-
- SyncRequestValueControl syncReqControl = ( SyncRequestValueControl ) searchRequest.getControl( SyncRequestValueControl.CONTROL_OID );
-
- searchRequest.remove( syncReqControl );
searchRequest.add( syncReq );
-
+
// Do the search
connection.search( searchRequest, this );
}
-
+
public void disconnet()
{
disconnected = true;
try
{
+ if ( refreshThread != null )
+ {
+ refreshThread.stopRefreshing();
+ }
+
connection.unBind();
LOG.info( "Unbound from the server {}", config.getProviderHost() );
@@ -634,7 +520,7 @@
{
LOG.error( "Failed to close the connection", e );
}
-
+
}
@@ -643,26 +529,26 @@
*/
private void storeCookie()
{
- if( syncCookie == null )
+ if ( syncCookie == null )
{
return;
}
-
+
try
{
FileOutputStream fout = new FileOutputStream( cookieFile );
fout.write( syncCookie.length );
fout.write( syncCookie );
fout.close();
-
+
LOG.debug( "stored the cookie" );
}
- catch( Exception e )
+ catch ( Exception e )
{
LOG.error( "Failed to store the cookie", e );
}
}
-
+
/**
* read the cookie from a file(if exists).
@@ -671,56 +557,55 @@
{
try
{
- if( cookieFile.exists() && ( cookieFile.length() > 0 ) )
+ if ( cookieFile.exists() && ( cookieFile.length() > 0 ) )
{
FileInputStream fin = new FileInputStream( cookieFile );
- syncCookie = new byte[ fin.read() ];
+ syncCookie = new byte[fin.read()];
fin.read( syncCookie );
fin.close();
-
+
LOG.debug( "read the cookie from file: " + StringTools.utf8ToString( syncCookie ) );
}
}
- catch( Exception e )
+ catch ( Exception e )
{
LOG.error( "Failed to read the cookie", e );
}
}
-
+
/**
* deletes the cookie file(if exists)
*/
public void deleteCookieFile()
{
- if( cookieFile != null && cookieFile.exists() )
+ if ( cookieFile != null && cookieFile.exists() )
{
LOG.debug( "deleting the cookie file" );
cookieFile.delete();
}
}
-
-
- private void modify( Entry remoteEntry ) throws Exception
+
+
+ private void modify( Entry remoteEntry ) throws Exception
{
LOG.debug( "modifying entry with dn {}", remoteEntry.getDn().getName() );
-
+
Entry localEntry = session.lookup( remoteEntry.getDn() );
-
+
remoteEntry.removeAttributes( MOD_IGNORE_AT );
List<Modification> mods = new ArrayList<Modification>();
Iterator<EntryAttribute> itr = localEntry.iterator();
-
+
while ( itr.hasNext() )
{
EntryAttribute localAttr = itr.next();
String attrId = localAttr.getId();
Modification mod;
EntryAttribute remoteAttr = remoteEntry.get( attrId );
-
-
- if ( remoteAttr != null ) // would be better if we compare the values also? or will it consume more time?
+
+ if ( remoteAttr != null ) // would be better if we compare the values also? or will it consume more time?
{
mod = new ServerModification( ModificationOperation.REPLACE_ATTRIBUTE, remoteAttr );
remoteEntry.remove( remoteAttr );
@@ -729,19 +614,108 @@
{
mod = new ServerModification( ModificationOperation.REMOVE_ATTRIBUTE, localAttr );
}
-
+
mods.add( mod );
}
- if( remoteEntry.size() > 0 )
+ if ( remoteEntry.size() > 0 )
{
itr = remoteEntry.iterator();
- while( itr.hasNext() )
+ while ( itr.hasNext() )
{
mods.add( new ServerModification( ModificationOperation.ADD_ATTRIBUTE, itr.next() ) );
}
}
-
+
session.modify( remoteEntry.getDn(), mods );
}
+
+ /**
+ * A Thread implementation for synchronizing the DIT in refreshOnly mode
+ */
+ private class RefresherThread extends Thread
+ {
+ private volatile boolean stop;
+
+
+ public RefresherThread()
+ {
+ setDaemon( true );
+ }
+
+
+ public void run()
+ {
+ while ( !stop )
+ {
+ LOG.debug( "==================== Refresh Only ==========" );
+
+ try
+ {
+ doSyncSearch( SynchronizationModeEnum.REFRESH_ONLY );
+
+ LOG.info( "--------------------- Sleep for a little while ------------------" );
+ Thread.sleep( config.getConsumerInterval() );
+ LOG.debug( "--------------------- syncing again ------------------" );
+
+ }
+ catch ( InterruptedException ie )
+ {
+ LOG.warn( "refresher thread interrupted" );
+ }
+ catch ( Exception e )
+ {
+ LOG.error( "Failed to sync with refresh only mode", e );
+ }
+ }
+ }
+
+
+ public void stopRefreshing()
+ {
+ stop = true;
+ // just incase if it is sleeping
+ this.interrupt();
+ }
+ }
+
+
+ //====================== SearchListener methods ====================================
+
+ /* (non-Javadoc)
+ * @see org.apache.directory.shared.ldap.client.api.listeners.SearchListener#entryFound(org.apache.directory.shared.ldap.client.api.LdapConnection, org.apache.directory.shared.ldap.client.api.messages.SearchResultEntry)
+ */
+ public void entryFound( LdapConnection connection, SearchResultEntry searchResultEntry ) throws LdapException
+ {
+ handleSearchResult( searchResultEntry );
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.directory.shared.ldap.client.api.listeners.SearchListener#referralFound(org.apache.directory.shared.ldap.client.api.LdapConnection, org.apache.directory.shared.ldap.client.api.messages.SearchResultReference)
+ */
+ public void referralFound( LdapConnection connection, SearchResultReference searchResultReference )
+ throws LdapException
+ {
+ handleSearchReference( searchResultReference );
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.directory.shared.ldap.client.api.listeners.SearchListener#searchDone(org.apache.directory.shared.ldap.client.api.LdapConnection, org.apache.directory.shared.ldap.client.api.messages.SearchResultDone)
+ */
+ public void searchDone( LdapConnection connection, SearchResultDone searchResultDone ) throws LdapException
+ {
+ handleSearchDone( searchResultDone );
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.directory.shared.ldap.client.api.listeners.IntermediateResponseListener#responseReceived(org.apache.directory.shared.ldap.client.api.LdapConnection, org.apache.directory.shared.ldap.client.api.messages.IntermediateResponse)
+ */
+ public void responseReceived( LdapConnection connection, IntermediateResponse intermediateResponse )
+ {
+ handleSyncInfo( intermediateResponse.getResponseValue() );
+ }
+
}
Modified: directory/apacheds/trunk/syncrepl/src/main/java/org/apache/directory/server/syncrepl/SyncreplRunnerUI.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/syncrepl/src/main/java/org/apache/directory/server/syncrepl/SyncreplRunnerUI.java?rev=901154&r1=901153&r2=901154&view=diff
==============================================================================
--- directory/apacheds/trunk/syncrepl/src/main/java/org/apache/directory/server/syncrepl/SyncreplRunnerUI.java (original)
+++ directory/apacheds/trunk/syncrepl/src/main/java/org/apache/directory/server/syncrepl/SyncreplRunnerUI.java Wed Jan 20 12:19:32 2010
@@ -119,6 +119,8 @@
config.setAttributes( "*,entryUUID,entryCSN" );
config.setSearchScope( SearchScope.SUBTREE.getJndiScope() );
config.setReplicaId( 1 );
+ config.setRefreshPersist( false );
+ config.setConsumerInterval( 60 * 1000 );
agent.setConfig( config );
workDir = new File( System.getProperty( "java.io.tmpdir" ) + "/syncrepl-work" );