You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by ka...@apache.org on 2012/10/16 15:27:20 UTC
svn commit: r1398782 [2/2] - in /directory/apacheds/trunk:
core-api/src/main/java/org/apache/directory/server/core/api/event/
core-api/src/main/java/org/apache/directory/server/core/api/interceptor/context/
core-jndi/src/main/java/org/apache/directory/...
Modified: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/SyncReplRequestHandler.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/SyncReplRequestHandler.java?rev=1398782&r1=1398781&r2=1398782&view=diff
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/SyncReplRequestHandler.java (original)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/SyncReplRequestHandler.java Tue Oct 16 13:27:19 2012
@@ -26,18 +26,24 @@ import static org.apache.directory.serve
import static org.apache.directory.server.ldap.LdapServer.NO_TIME_LIMIT;
import java.io.File;
+import java.io.FilenameFilter;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.directory.server.constants.ServerDNConstants;
import org.apache.directory.server.core.api.DirectoryService;
+import org.apache.directory.server.core.api.event.DirectoryListenerAdapter;
+import org.apache.directory.server.core.api.event.EventService;
import org.apache.directory.server.core.api.event.EventType;
import org.apache.directory.server.core.api.event.NotificationCriteria;
import org.apache.directory.server.core.api.filtering.EntryFilteringCursor;
+import org.apache.directory.server.core.api.interceptor.context.DeleteOperationContext;
import org.apache.directory.server.i18n.I18n;
import org.apache.directory.server.ldap.LdapProtocolUtils;
import org.apache.directory.server.ldap.LdapServer;
@@ -84,13 +90,13 @@ import org.apache.directory.shared.ldap.
import org.apache.directory.shared.ldap.model.message.SearchScope;
import org.apache.directory.shared.ldap.model.message.controls.ChangeType;
import org.apache.directory.shared.ldap.model.message.controls.ManageDsaIT;
+import org.apache.directory.shared.ldap.model.name.Dn;
import org.apache.directory.shared.ldap.model.schema.AttributeType;
import org.apache.directory.shared.ldap.model.url.LdapUrl;
import org.apache.directory.shared.util.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
* Class used to process the incoming synchronization request from the consumers.
*
@@ -126,6 +132,12 @@ public class SyncReplRequestHandler impl
private ReplConsumerManager replicaUtil;
+ private ConsumerLogEntryDeleteListener cledListener;
+
+ private ReplicaEventLogJanitor logJanitor;
+
+ private AttributeType CSN_AT;
+
/**
* Create a SyncReplRequestHandler empty instance
*/
@@ -153,12 +165,14 @@ public class SyncReplRequestHandler impl
this.ldapServer = server;
this.dirService = server.getDirectoryService();
-
+
+ CSN_AT = dirService.getSchemaManager()
+ .lookupAttributeTypeRegistry( SchemaConstants.ENTRY_CSN_AT );
+
OBJECT_CLASS_AT = dirService.getSchemaManager()
.lookupAttributeTypeRegistry( SchemaConstants.OBJECT_CLASS_AT );
- File workDir = dirService.getInstanceLayout().getLogDirectory();
- syncReplData = new File( workDir, "syncrepl-data" );
+ syncReplData = dirService.getInstanceLayout().getReplDirectory();
if ( !syncReplData.exists() )
{
@@ -172,8 +186,18 @@ public class SyncReplRequestHandler impl
loadReplicaInfo();
+ logJanitor = new ReplicaEventLogJanitor( replicaLogMap );
+ logJanitor.start();
+
registerPersistentSearches();
+ cledListener = new ConsumerLogEntryDeleteListener();
+ NotificationCriteria criteria = new NotificationCriteria();
+ criteria.setBase( new Dn( dirService.getSchemaManager(), ServerDNConstants.REPL_CONSUMER_DN_STR ) );
+ criteria.setEventMask( EventType.DELETE );
+
+ dirService.getEventService().addListener( cledListener, criteria );
+
Thread consumerInfoUpdateThread = new Thread( createConsumerInfoUpdateTask() );
consumerInfoUpdateThread.setDaemon( true );
consumerInfoUpdateThread.start();
@@ -196,21 +220,27 @@ public class SyncReplRequestHandler impl
*/
public void stop()
{
+ logJanitor.stopCleaning();
+
+ EventService evtSrv = dirService.getEventService();
+
for ( ReplicaEventLog log : replicaLogMap.values() )
{
try
{
PROVIDER_LOG.debug( "Stopping the logging for replica ", log.getId() );
+ evtSrv.removeListener( log.getPersistentListener() );
log.stop();
}
catch ( Exception e )
{
- LOG.warn( "Failed to close the event log {}", log.getId() );
- LOG.warn( "", e );
+ LOG.warn( "Failed to close the event log {}", log.getId(), e );
PROVIDER_LOG.error( "Failed to close the event log {}", log.getId(), e );
}
}
-
+
+ evtSrv.removeListener( cledListener );
+
initialized = false;
}
@@ -290,57 +320,62 @@ public class SyncReplRequestHandler impl
/**
* Send all the stored modifications to the consumer
*/
- private String sendContentFromLog( LdapSession session, SearchRequest req, ReplicaEventLog clientMsgLog,
- String consumerCsn )
+ private void sendContentFromLog( LdapSession session, SearchRequest req, ReplicaEventLog clientMsgLog, String fromCsn )
throws Exception
{
// do the search from the log
- String lastSentCsn = clientMsgLog.getLastSentCsn();
-
- ReplicaJournalCursor cursor = clientMsgLog.getCursor( consumerCsn );
+ String lastSentCsn = fromCsn;
+ ReplicaJournalCursor cursor = clientMsgLog.getCursor( fromCsn );
+
PROVIDER_LOG.debug( "Processing the log for replica {}", clientMsgLog.getId() );
- while ( cursor.next() )
+ try
{
- ReplicaEventMessage replicaEventMessage = cursor.get();
- Entry entry = replicaEventMessage.getEntry();
- LOG.debug( "Read message from the queue {}", entry );
- PROVIDER_LOG.debug( "Read message from the queue {}", entry );
-
- lastSentCsn = entry.get( SchemaConstants.ENTRY_CSN_AT ).getString();
-
- ChangeType event = replicaEventMessage.getChangeType();
-
- // if event type is null, then it is a MODDN operation
- if ( event == ChangeType.MODDN )
- {
- sendSearchResultEntry( session, req, entry, SyncStateTypeEnum.MODIFY );
- }
- else
+ while ( cursor.next() )
{
+ ReplicaEventMessage replicaEventMessage = cursor.get();
+ Entry entry = replicaEventMessage.getEntry();
+ LOG.debug( "Read message from the queue {}", entry );
+ PROVIDER_LOG.debug( "Read message from the queue {}", entry );
+
+ lastSentCsn = entry.get( CSN_AT ).getString();
+
+ ChangeType event = replicaEventMessage.getChangeType();
+
SyncStateTypeEnum syncStateType = null;
-
+
switch ( event )
{
- case ADD:
- case MODIFY:
+ case ADD :
syncStateType = SyncStateTypeEnum.ADD;
break;
-
- case DELETE:
+
+ case MODIFY :
+ syncStateType = SyncStateTypeEnum.MODIFY;
+ break;
+
+ case MODDN :
+ syncStateType = SyncStateTypeEnum.MODDN;
+
+ case DELETE :
syncStateType = SyncStateTypeEnum.DELETE;
break;
}
-
+
sendSearchResultEntry( session, req, entry, syncStateType );
+
+ clientMsgLog.setLastSentCsn( lastSentCsn );
+
+ PROVIDER_LOG.debug( "The latest entry sent to the consumer {} has this CSN : {}", clientMsgLog.getId(), lastSentCsn );
}
+
+ PROVIDER_LOG.debug( "All pending modifciations for replica {} processed", clientMsgLog.getId() );
+ }
+ finally
+ {
+ cursor.close();
}
-
- PROVIDER_LOG.debug( "All pending modifciations for replica {} processed", clientMsgLog.getId() );
- cursor.close();
-
- return lastSentCsn;
}
@@ -351,57 +386,56 @@ public class SyncReplRequestHandler impl
private void doContentUpdate( LdapSession session, SearchRequest req, ReplicaEventLog replicaLog, String consumerCsn )
throws Exception
{
- boolean refreshNPersist = isRefreshNPersist( req );
-
- // if this method is called with refreshAndPersist
- // means the client was offline after it initiated a persistent synch session
- // we need to update the handler's session
- if ( refreshNPersist )
- {
- SyncReplSearchListener handler = replicaLog.getPersistentListener();
- handler.setSearchRequest( req );
- handler.setSession( session );
- }
-
- String lastSentCsn = sendContentFromLog( session, req, replicaLog, consumerCsn );
-
- PROVIDER_LOG.debug( "The latest entry sent to the consumer {} has this CSN : {}", replicaLog.getId(),
- lastSentCsn );
- byte[] cookie = LdapProtocolUtils.createCookie( replicaLog.getId(), lastSentCsn );
-
- if ( refreshNPersist )
- {
- IntermediateResponse intermResp = new IntermediateResponseImpl( req.getMessageId() );
- intermResp.setResponseName( SyncInfoValue.OID );
-
- SyncInfoValue syncInfo = new SyncInfoValueDecorator( ldapServer.getDirectoryService()
- .getLdapCodecService(),
- SynchronizationInfoEnum.NEW_COOKIE );
- syncInfo.setCookie( cookie );
- intermResp.setResponseValue( ( ( SyncInfoValueDecorator ) syncInfo ).getValue() );
-
- PROVIDER_LOG
- .debug( "Sent the intermediate response to the {} consumer, {}", replicaLog.getId(), intermResp );
- session.getIoSession().write( intermResp );
-
- replicaLog.getPersistentListener().setPushInRealTime( refreshNPersist );
- }
- else
+ synchronized ( replicaLog )
{
- SearchResultDone searchDoneResp = ( SearchResultDone ) req.getResultResponse();
- searchDoneResp.getLdapResult().setResultCode( ResultCodeEnum.SUCCESS );
- SyncDoneValue syncDone = new SyncDoneValueDecorator(
- ldapServer.getDirectoryService().getLdapCodecService() );
- syncDone.setCookie( cookie );
- searchDoneResp.addControl( syncDone );
-
- PROVIDER_LOG.debug( "Send a SearchResultDone response to the {} consumer", replicaLog.getId(),
- searchDoneResp );
-
- session.getIoSession().write( searchDoneResp );
+ boolean refreshNPersist = isRefreshNPersist( req );
+
+ // if this method is called with refreshAndPersist
+ // means the client was offline after it initiated a persistent synch session
+ // we need to update the handler's session
+ if ( refreshNPersist )
+ {
+ SyncReplSearchListener handler = replicaLog.getPersistentListener();
+ handler.setSearchRequest( req );
+ handler.setSession( session );
+ }
+
+ sendContentFromLog( session, req, replicaLog, consumerCsn );
+
+ String lastSentCsn = replicaLog.getLastSentCsn();
+
+ byte[] cookie = LdapProtocolUtils.createCookie( replicaLog.getId(), lastSentCsn );
+
+ if ( refreshNPersist )
+ {
+ IntermediateResponse intermResp = new IntermediateResponseImpl( req.getMessageId() );
+ intermResp.setResponseName( SyncInfoValue.OID );
+
+ SyncInfoValue syncInfo = new SyncInfoValueDecorator( ldapServer.getDirectoryService()
+ .getLdapCodecService(),
+ SynchronizationInfoEnum.NEW_COOKIE );
+ syncInfo.setCookie( cookie );
+ intermResp.setResponseValue( ((SyncInfoValueDecorator)syncInfo).getValue() );
+
+ PROVIDER_LOG.debug( "Sent the intermediate response to the {} consumer, {}", replicaLog.getId(), intermResp );
+ session.getIoSession().write( intermResp );
+
+ replicaLog.getPersistentListener().setPushInRealTime( refreshNPersist );
+ }
+ else
+ {
+ SearchResultDone searchDoneResp = ( SearchResultDone ) req.getResultResponse();
+ searchDoneResp.getLdapResult().setResultCode( ResultCodeEnum.SUCCESS );
+ SyncDoneValue syncDone = new SyncDoneValueDecorator(
+ ldapServer.getDirectoryService().getLdapCodecService() );
+ syncDone.setCookie( cookie );
+ searchDoneResp.addControl( syncDone );
+
+ PROVIDER_LOG.debug( "Send a SearchResultDone response to the {} consumer", replicaLog.getId(), searchDoneResp );
+
+ session.getIoSession().write( searchDoneResp );
+ }
}
-
- replicaLog.setLastSentCsn( lastSentCsn );
}
@@ -428,7 +462,7 @@ public class SyncReplRequestHandler impl
StringValue contexCsnValue = new StringValue( contextCsn );
// modify the filter to include the context Csn
- GreaterEqNode csnGeNode = new GreaterEqNode( SchemaConstants.ENTRY_CSN_AT, contexCsnValue );
+ GreaterEqNode csnGeNode = new GreaterEqNode( CSN_AT, contexCsnValue );
ExprNode postInitContentFilter = new AndNode( modifiedFilter, csnGeNode );
request.setFilter( postInitContentFilter );
@@ -457,32 +491,36 @@ public class SyncReplRequestHandler impl
dirService.getEventService().addListener( handler, criteria );
// then start pushing initial content
- LessEqNode csnNode = new LessEqNode( SchemaConstants.ENTRY_CSN_AT, contexCsnValue );
+ LessEqNode csnNode = new LessEqNode( CSN_AT, contexCsnValue );
// modify the filter to include the context Csn
ExprNode initialContentFilter = new AndNode( modifiedFilter, csnNode );
request.setFilter( initialContentFilter );
// Now, do a search to get all the entries
- SearchResultDone searchDoneResp = doSimpleSearch( session, request );
+ SearchResultDone searchDoneResp = doSimpleSearch( session, request, replicaLog );
if ( searchDoneResp.getLdapResult().getResultCode() == ResultCodeEnum.SUCCESS )
{
- replicaLog.setLastSentCsn( contextCsn );
+ if( replicaLog.getLastSentCsn() == null )
+ {
+ replicaLog.setLastSentCsn( contextCsn );
+ }
+
byte[] cookie = LdapProtocolUtils.createCookie( replicaLog.getId(), contextCsn );
if ( refreshNPersist ) // refreshAndPersist mode
{
- contextCsn = sendContentFromLog( session, request, replicaLog, contextCsn );
- cookie = LdapProtocolUtils.createCookie( replicaLog.getId(), contextCsn );
+ sendContentFromLog( session, request, replicaLog, contextCsn );
+ cookie = LdapProtocolUtils.createCookie(replicaLog.getId(), replicaLog.getLastSentCsn());
IntermediateResponse intermResp = new IntermediateResponseImpl( request.getMessageId() );
intermResp.setResponseName( SyncInfoValue.OID );
- SyncInfoValue syncInfo = new SyncInfoValueDecorator(
+ SyncInfoValue syncInfo = new SyncInfoValueDecorator(
ldapServer.getDirectoryService().getLdapCodecService(), SynchronizationInfoEnum.NEW_COOKIE );
syncInfo.setCookie( cookie );
- intermResp.setResponseValue( ( ( SyncInfoValueDecorator ) syncInfo ).getValue() );
+ intermResp.setResponseValue( ((SyncInfoValueDecorator)syncInfo).getValue() );
PROVIDER_LOG.info( "Sending the intermediate response to consumer {}, {}", replicaLog, syncInfo );
@@ -498,8 +536,7 @@ public class SyncReplRequestHandler impl
ldapServer.getDirectoryService().getLdapCodecService() );
syncDone.setCookie( cookie );
searchDoneResp.addControl( syncDone );
- PROVIDER_LOG.info( "Sending the searchResultDone response to consumer {}, {}", replicaLog,
- searchDoneResp );
+ PROVIDER_LOG.info( "Sending the searchResultDone response to consumer {}, {}", replicaLog, searchDoneResp );
session.getIoSession().write( searchDoneResp );
}
@@ -511,7 +548,7 @@ public class SyncReplRequestHandler impl
.getResultCode() );
PROVIDER_LOG.warn( "initial content refresh didn't succeed due to {}", searchDoneResp.getLdapResult()
.getResultCode() );
- replicaLog.truncate();
+ replicaLog.stop();
replicaLog = null;
// remove the listener
@@ -521,7 +558,7 @@ public class SyncReplRequestHandler impl
}
// if all is well then store the consumer information
- replicaUtil.addConsumerEntry( replicaLog );
+ replicaUtil.addConsumerEntry(replicaLog );
// add to the map only after storing in the DIT, else the Replica update thread barfs
replicaLogMap.put( replicaLog.getId(), replicaLog );
@@ -532,7 +569,7 @@ public class SyncReplRequestHandler impl
* Process a search on the provider to get all the modified entries. We then send all
* of them to the consumer
*/
- private SearchResultDone doSimpleSearch( LdapSession session, SearchRequest req ) throws Exception
+ private SearchResultDone doSimpleSearch( LdapSession session, SearchRequest req, ReplicaEventLog replicaLog ) throws Exception
{
SearchResultDone searchDoneResp = ( SearchResultDone ) req.getResultResponse();
LdapResult ldapResult = searchDoneResp.getLdapResult();
@@ -562,7 +599,7 @@ public class SyncReplRequestHandler impl
LOG.debug( "using <{},{}> for size limit", requestLimit, serverLimit );
long sizeLimit = min( requestLimit, serverLimit );
- readResults( session, req, ldapResult, cursor, sizeLimit );
+ readResults( session, req, ldapResult, cursor, sizeLimit, replicaLog );
}
finally
{
@@ -587,10 +624,11 @@ public class SyncReplRequestHandler impl
* Process the results get from a search request. We will send them to the client.
*/
private void readResults( LdapSession session, SearchRequest req, LdapResult ldapResult,
- EntryFilteringCursor cursor, long sizeLimit ) throws Exception
+ EntryFilteringCursor cursor, long sizeLimit, ReplicaEventLog replicaLog ) throws Exception
{
long count = 0;
+
while ( ( count < sizeLimit ) && cursor.next() )
{
// Handle closed session
@@ -598,8 +636,7 @@ public class SyncReplRequestHandler impl
{
// The client has closed the connection
LOG.debug( "Request terminated for message {}, the client has closed the session", req.getMessageId() );
- PROVIDER_LOG.debug( "Request terminated for message {}, the client has closed the session",
- req.getMessageId() );
+ PROVIDER_LOG.debug( "Request terminated for message {}, the client has closed the session", req.getMessageId() );
break;
}
@@ -615,6 +652,9 @@ public class SyncReplRequestHandler impl
sendSearchResultEntry( session, req, entry, SyncStateTypeEnum.ADD );
+ String lastSentCsn = entry.get( CSN_AT ).getString();
+ replicaLog.setLastSentCsn( lastSentCsn );
+
count++;
}
@@ -894,7 +934,8 @@ public class SyncReplRequestHandler impl
try
{
List<ReplicaEventLog> eventLogs = replicaUtil.getReplicaEventLogs();
-
+ List<String> eventLogNames = new ArrayList<String>();
+
if ( !eventLogs.isEmpty() )
{
for ( ReplicaEventLog replica : eventLogs )
@@ -902,6 +943,7 @@ public class SyncReplRequestHandler impl
LOG.debug( "initializing the replica log from {}", replica.getId() );
PROVIDER_LOG.debug( "initializing the replica log from {}", replica.getId() );
replicaLogMap.put( replica.getId(), replica );
+ eventLogNames.add( replica.getName() );
// update the replicaCount's value to assign a correct value to the new replica(s)
if ( replicaCount.get() < replica.getId() )
@@ -915,6 +957,17 @@ public class SyncReplRequestHandler impl
LOG.debug( "no replica logs found to initialize" );
PROVIDER_LOG.debug( "no replica logs found to initialize" );
}
+
+ // remove unused logs
+ File[] replicaLogNames = getAllReplJournalNames();
+ for( File f : replicaLogNames )
+ {
+ if( !eventLogNames.contains( f.getName() ) )
+ {
+ f.delete();
+ LOG.info( "removed unused replication event log {}", f );
+ }
+ }
}
catch ( Exception e )
{
@@ -946,9 +999,8 @@ public class SyncReplRequestHandler impl
{
LOG.warn( "invalid peristent search criteria {} for the replica {}", log.getSearchCriteria(), log
.getId() );
- PROVIDER_LOG.warn( "invalid peristent search criteria {} for the replica {}", log.getSearchCriteria(),
- log
- .getId() );
+ PROVIDER_LOG.warn( "invalid peristent search criteria {} for the replica {}", log.getSearchCriteria(), log
+ .getId() );
}
}
}
@@ -1042,4 +1094,66 @@ public class SyncReplRequestHandler impl
return control.getMode() == SynchronizationModeEnum.REFRESH_AND_PERSIST;
}
+
+
+ private File[] getAllReplJournalNames()
+ {
+ File replDir = dirService.getInstanceLayout().getReplDirectory();
+ FilenameFilter filter = new FilenameFilter()
+ {
+ @Override
+ public boolean accept( File dir, String name )
+ {
+ return name.startsWith( ReplicaEventLog.REPLICA_EVENT_LOG_NAME_PREFIX );
+ }
+ };
+
+ return replDir.listFiles( filter );
+ }
+
+
+ /**
+ * an event listener for handling deletions of replication event log entries present under ou=consumers,ou=system
+ */
+ private class ConsumerLogEntryDeleteListener extends DirectoryListenerAdapter
+ {
+ @Override
+ public void entryDeleted( DeleteOperationContext deleteContext )
+ {
+ Dn consumerLogDn = deleteContext.getDn();
+ String name = ReplicaEventLog.REPLICA_EVENT_LOG_NAME_PREFIX + consumerLogDn.getRdn().getValue().getString();
+ // lock this listener instance
+ synchronized ( this )
+ {
+ for( ReplicaEventLog log : replicaLogMap.values() )
+ {
+ if( name.equalsIgnoreCase( log.getName() ) )
+ {
+ synchronized ( log )
+ {
+ dirService.getEventService().removeListener( log.getPersistentListener() );
+ LOG.debug( "removed the persistent listener for replication event log {}", consumerLogDn );
+
+ replicaLogMap.remove( log.getId() );
+ try
+ {
+ // get the correct name, just incase cause we used equalsIgnoreCase
+ name = log.getName();
+ log.stop();
+
+ new File( dirService.getInstanceLayout().getReplDirectory(), name ).delete();
+ LOG.info( "successfully removed replication event log {}", consumerLogDn );
+ }
+ catch( Exception e )
+ {
+ LOG.warn( "Closing the replication event log of the entry {} was not successful, will be removed anyway", consumerLogDn, e );
+ }
+ }
+
+ break;
+ }
+ } // end of for
+ } // end of synchronized block
+ } // end of delete method
+ }
}
Modified: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/SyncReplSearchListener.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/SyncReplSearchListener.java?rev=1398782&r1=1398781&r2=1398782&view=diff
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/SyncReplSearchListener.java (original)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/SyncReplSearchListener.java Tue Oct 16 13:27:19 2012
@@ -20,10 +20,12 @@
package org.apache.directory.server.ldap.replication.provider;
+import org.apache.directory.server.constants.ServerDNConstants;
import org.apache.directory.server.core.api.DirectoryService;
import org.apache.directory.server.core.api.entry.ClonedServerEntry;
import org.apache.directory.server.core.api.event.DirectoryListener;
import org.apache.directory.server.core.api.event.EventType;
+import org.apache.directory.server.core.api.interceptor.context.AbstractChangeOperationContext;
import org.apache.directory.server.core.api.interceptor.context.AddOperationContext;
import org.apache.directory.server.core.api.interceptor.context.DeleteOperationContext;
import org.apache.directory.server.core.api.interceptor.context.ModifyOperationContext;
@@ -67,7 +69,7 @@ public class SyncReplSearchListener impl
/** The ldap session */
private LdapSession session;
-
+
/** The search request we are processing */
private SearchRequest searchRequest;
@@ -76,8 +78,11 @@ public class SyncReplSearchListener impl
/** The consumer configuration */
private final ReplicaEventLog consumerMsgLog;
-
-
+
+ private static String replConsumerConfigDn = ServerDNConstants.REPL_CONSUMER_CONFIG_DN.toLowerCase();
+ private static String schemaDn = SchemaConstants.OU_SCHEMA.toLowerCase();
+ private static String replConsumerDn = ServerDNConstants.REPL_CONSUMER_DN_STR.toLowerCase();
+
/**
* Create a new instance of a consumer listener
*
@@ -114,7 +119,7 @@ public class SyncReplSearchListener impl
public void setSearchRequest( SearchRequest searchRequest )
{
this.searchRequest = searchRequest;
-
+
if ( searchRequest != null )
{
searchRequest.addAbandonListener( this );
@@ -122,7 +127,14 @@ public class SyncReplSearchListener impl
}
- /**
+ @Override
+ public boolean isSynchronous()
+ {
+ return true; // always synchronous
+ }
+
+
+ /**
* Abandon a SearchRequest
*
* @param searchRequest The SearchRequest to abandon
@@ -156,11 +168,11 @@ public class SyncReplSearchListener impl
}
}
-
+
/**
* Create the SyncStateValue control
*/
- private SyncStateValue createControl( DirectoryService directoryService, SyncStateTypeEnum operation, Entry entry )
+ private SyncStateValue createControl( DirectoryService directoryService, SyncStateTypeEnum operation, Entry entry )
throws LdapInvalidAttributeValueException
{
SyncStateValue syncStateValue = new SyncStateValueDecorator( directoryService.getLdapCodecService() );
@@ -169,15 +181,15 @@ public class SyncReplSearchListener impl
String uuidStr = entry.get( SchemaConstants.ENTRY_UUID_AT ).getString();
syncStateValue.setEntryUUID( Strings.uuidToBytes( uuidStr ) );
syncStateValue.setCookie( getCookie( entry ) );
-
+
return syncStateValue;
}
-
-
+
+
/**
* Send the result to the consumer. If the consumer has disconnected, we fail back to the queue.
*/
- private void sendResult( SearchResultEntry searchResultEntry, Entry entry, EventType eventType,
+ private void sendResult( SearchResultEntry searchResultEntry, Entry entry, EventType eventType,
SyncStateValue syncStateValue )
{
searchResultEntry.addControl( syncStateValue );
@@ -188,7 +200,7 @@ public class SyncReplSearchListener impl
// Now, send the entry to the consumer
handleWriteFuture( future, entry, eventType );
}
-
+
/**
* Process a ADD operation. The added entry is pushed to the consumer if it's connected,
@@ -199,13 +211,17 @@ public class SyncReplSearchListener impl
public void entryAdded( AddOperationContext addContext )
{
Entry entry = addContext.getEntry();
+
+ if ( isConfigEntry( entry ) || isNotValidForReplication( addContext ) )
+ {
+ return;
+ }
try
{
//System.out.println( "ADD Listener : log " + entry.getDn() );
// we log it first
- consumerMsgLog.log( new ReplicaEventMessage( ChangeType.ADD, ( ( ClonedServerEntry ) entry )
- .getClonedEntry() ) );
+ consumerMsgLog.log( new ReplicaEventMessage( ChangeType.ADD, ((ClonedServerEntry)entry).getClonedEntry() ) );
// We send the added entry directly to the consumer if it's connected
if ( pushInRealTime )
@@ -216,12 +232,11 @@ public class SyncReplSearchListener impl
resultEntry.setEntry( entry );
// Create the control which will be added to the response.
- SyncStateValue syncAdd = createControl( session.getCoreSession().getDirectoryService(),
- SyncStateTypeEnum.ADD, entry );
-
+ SyncStateValue syncAdd = createControl( session.getCoreSession().getDirectoryService(), SyncStateTypeEnum.ADD, entry );
+
sendResult( resultEntry, entry, EventType.ADD, syncAdd );
}
-
+
}
catch ( LdapInvalidAttributeValueException e )
{
@@ -240,9 +255,15 @@ public class SyncReplSearchListener impl
public void entryDeleted( DeleteOperationContext deleteContext )
{
Entry entry = deleteContext.getEntry();
+
+ if ( isConfigEntry( entry ) || isNotValidForReplication( deleteContext ) )
+ {
+ return;
+ }
+
sendDeletedEntry( entry );
}
-
+
/**
* A helper method, as the delete opertaionis used by the ModDN operations.
@@ -253,15 +274,14 @@ public class SyncReplSearchListener impl
{
//System.out.println( "DELETE Listener : log " + entry.getDn() );
consumerMsgLog.log( new ReplicaEventMessage( ChangeType.DELETE, entry ) );
-
+
if ( pushInRealTime )
{
SearchResultEntry resultEntry = new SearchResultEntryImpl( searchRequest.getMessageId() );
resultEntry.setObjectName( entry.getDn() );
resultEntry.setEntry( entry );
- SyncStateValue syncDelete = createControl( session.getCoreSession().getDirectoryService(),
- SyncStateTypeEnum.DELETE, entry );
+ SyncStateValue syncDelete = createControl( session.getCoreSession().getDirectoryService(), SyncStateTypeEnum.DELETE, entry );
sendResult( resultEntry, entry, EventType.DELETE, syncDelete );
}
@@ -284,11 +304,16 @@ public class SyncReplSearchListener impl
{
Entry alteredEntry = modifyContext.getAlteredEntry();
+ if ( isConfigEntry( alteredEntry ) || isNotValidForReplication( modifyContext ) )
+ {
+ return;
+ }
+
try
{
//System.out.println( "MODIFY Listener : log " + alteredEntry.getDn() );
consumerMsgLog.log( new ReplicaEventMessage( ChangeType.MODIFY, alteredEntry ) );
-
+
if ( pushInRealTime )
{
@@ -296,8 +321,7 @@ public class SyncReplSearchListener impl
resultEntry.setObjectName( modifyContext.getDn() );
resultEntry.setEntry( alteredEntry );
- SyncStateValue syncModify = createControl( session.getCoreSession().getDirectoryService(),
- SyncStateTypeEnum.MODIFY, alteredEntry );
+ SyncStateValue syncModify = createControl( session.getCoreSession().getDirectoryService(), SyncStateTypeEnum.MODIFY, alteredEntry );
sendResult( resultEntry, alteredEntry, EventType.MODIFY, syncModify );
}
@@ -320,6 +344,11 @@ public class SyncReplSearchListener impl
// should always send the modified entry cause the consumer perform the modDn operation locally
Entry entry = moveContext.getModifiedEntry();
+ if ( isConfigEntry( entry ) || isNotValidForReplication( moveContext ) )
+ {
+ return;
+ }
+
try
{
if ( !moveContext.getNewSuperior().isDescendantOf( consumerMsgLog.getSearchCriteria().getBase() ) )
@@ -330,17 +359,16 @@ public class SyncReplSearchListener impl
//System.out.println( "MOVE Listener : log " + moveContext.getDn() + " moved to " + moveContext.getNewSuperior() );
consumerMsgLog.log( new ReplicaEventMessage( ChangeType.MODDN, entry ) );
-
+
if ( pushInRealTime )
{
SearchResultEntry resultEntry = new SearchResultEntryImpl( searchRequest.getMessageId() );
resultEntry.setObjectName( moveContext.getDn() );
resultEntry.setEntry( entry );
- SyncStateValue syncModify = createControl( session.getCoreSession().getDirectoryService(),
- SyncStateTypeEnum.MODDN, entry );
+ SyncStateValue syncModify = createControl( session.getCoreSession().getDirectoryService(), SyncStateTypeEnum.MODDN, entry );
- sendResult( resultEntry, entry, null, syncModify );
+ sendResult( resultEntry, entry, EventType.MOVE, syncModify );
}
}
catch ( Exception e )
@@ -358,31 +386,36 @@ public class SyncReplSearchListener impl
*/
public void entryMovedAndRenamed( MoveAndRenameOperationContext moveAndRenameContext )
{
+ // should always send the modified entry cause the consumer perform the modDn operation locally
+ Entry entry = moveAndRenameContext.getModifiedEntry();
+
+ if ( isConfigEntry( entry ) || isNotValidForReplication( moveAndRenameContext ) )
+ {
+ return;
+ }
+
try
{
if ( !moveAndRenameContext.getNewSuperiorDn().isDescendantOf( consumerMsgLog.getSearchCriteria().getBase() ) )
{
- sendDeletedEntry( moveAndRenameContext.getEntry() );
+ sendDeletedEntry( entry );
return;
}
- // should always send the modified entry cause the consumer perform the modDn operation locally
- Entry entry = moveAndRenameContext.getModifiedEntry();
//System.out.println( "MOVE AND RENAME Listener : log " + moveAndRenameContext.getDn() +
// " moved to " + moveAndRenameContext.getNewSuperiorDn() + " renamed to " + moveAndRenameContext.getNewRdn() );
consumerMsgLog.log( new ReplicaEventMessage( ChangeType.MODDN, entry ) );
-
+
if ( pushInRealTime )
{
SearchResultEntry resultEntry = new SearchResultEntryImpl( searchRequest.getMessageId() );
resultEntry.setObjectName( entry.getDn() );
resultEntry.setEntry( entry );
- SyncStateValue syncModify = createControl( session.getCoreSession().getDirectoryService(),
- SyncStateTypeEnum.MODDN, entry );
+ SyncStateValue syncModify = createControl( session.getCoreSession().getDirectoryService(), SyncStateTypeEnum.MODDN, entry );
- sendResult( resultEntry, entry, null, syncModify );
+ sendResult( resultEntry, entry, EventType.MOVE_AND_RENAME, syncModify );
}
}
catch ( Exception e )
@@ -403,25 +436,29 @@ public class SyncReplSearchListener impl
// should always send the modified entry cause the consumer perform the modDn operation locally
Entry entry = renameContext.getModifiedEntry();
+ if ( isConfigEntry( entry ) || isNotValidForReplication( renameContext ) )
+ {
+ return;
+ }
+
try
{
// should always send the original entry cause the consumer perform the modDn operation there
//System.out.println( "RENAME Listener : log " + renameContext.getDn() + " renamed to " + renameContext.getNewRdn() );
consumerMsgLog.log( new ReplicaEventMessage( ChangeType.MODDN, entry ) );
-
+
if ( pushInRealTime )
{
SearchResultEntry resultEntry = new SearchResultEntryImpl( searchRequest.getMessageId() );
resultEntry.setObjectName( entry.getDn() );
resultEntry.setEntry( entry );
- SyncStateValue syncModify = createControl( session.getCoreSession().getDirectoryService(),
- SyncStateTypeEnum.MODDN, entry );
-
+ SyncStateValue syncModify = createControl( session.getCoreSession().getDirectoryService(), SyncStateTypeEnum.MODDN, entry );
+
// In this case, the cookie is different
syncModify.setCookie( getCookie( entry ) );
- sendResult( resultEntry, entry, null, syncModify );
+ sendResult( resultEntry, entry, EventType.RENAME, syncModify );
}
}
catch ( Exception e )
@@ -469,34 +506,113 @@ public class SyncReplSearchListener impl
// Let the operation be executed.
// Note : we wait 10 seconds max
future.awaitUninterruptibly( 10000L );
-
+
if ( !future.isWritten() )
{
- LOG.error( "Failed to write to the consumer {} during the event {} on entry {}", new Object[]
- {
- consumerMsgLog.getId(), event, entry.getDn() } );
+ LOG.error( "Failed to write to the consumer {} during the event {} on entry {}", new Object[] {
+ consumerMsgLog.getId(), event, entry.getDn() } );
LOG.error( "", future.getException() );
// set realtime push to false, will be set back to true when the client
// comes back and sends another request this flag will be set to true
pushInRealTime = false;
}
+ else
+ {
+ try
+ {
+ // if successful update the last sent CSN
+ consumerMsgLog.setLastSentCsn( entry.get( SchemaConstants.ENTRY_CSN_AT ).getString() );
+ }
+ catch( Exception e )
+ {
+ //should never happen
+ LOG.error( "No entry CSN attribute found", e );
+ }
+ }
+ }
+
+
+ /**
+ * checks if the given entry belongs to the ou=config or ou=schema partition
+ * We don't replicate those two partitions
+ * @param entry the entry
+ * @return true if the entry belongs to ou=config partition, false otherwise
+ */
+ private boolean isConfigEntry( Entry entry )
+ {
+ // we can do Dn.isDescendantOf but in this part of the
+ // server the DNs are all normalized and a simple string compare should
+ // do the trick
+
+ String name = entry.getDn().getName().toLowerCase();
+
+ if ( name.endsWith( replConsumerConfigDn ) ||
+ name.endsWith( schemaDn ) ||
+ name.endsWith( replConsumerDn ) )
+ {
+ return true;
+ }
+
+ // do not replicate the changes made to transport config entries
+ if( name.startsWith( "ads-transportid" ) && name.endsWith( ServerDNConstants.CONFIG_DN ) )
+ {
+ return true;
+ }
+
+ return false;
+ }
+
+
+ private boolean isNotValidForReplication( AbstractChangeOperationContext ctx )
+ {
+ if( ctx.isGenerateNoReplEvt() )
+ {
+ return true;
+ }
+
+ return isMmrConfiguredToReceiver( ctx );
}
-
/**
+ * checks if the sender of this replication event is setup with MMR
+ * (Note: this method is used to prevent sending a repicated event back to the sender after
+ * performing local update)
+ * @param ctx the operation's context
+ * @return true if the rid present in operation context is same as the event log's ID, false otherwise
+ */
+ private boolean isMmrConfiguredToReceiver( AbstractChangeOperationContext ctx )
+ {
+
+ if( ctx.isReplEvent() )
+ {
+ boolean skip = ( ctx.getRid() == consumerMsgLog.getId() );
+
+ if( skip )
+ {
+ LOG.debug( "RID in operation context matches with the ID of replication event log {} for host {}", consumerMsgLog.getName(), consumerMsgLog.getHostName() );
+ }
+
+ return skip;
+ }
+
+ return false;
+ }
+
+
+ /**
* {@inheritDoc}
*/
public String toString()
{
StringBuilder sb = new StringBuilder();
-
+
sb.append( "SyncReplSearchListener : \n" );
sb.append( '\'' ).append( searchRequest ).append( "', " );
sb.append( '\'' ).append( pushInRealTime ).append( "', \n" );
sb.append( consumerMsgLog );
sb.append( '\n' );
-
+
return sb.toString();
}
}