You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by ak...@apache.org on 2006/11/05 02:25:52 UTC
svn commit: r471315 -
/directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/
Author: akarasulu
Date: Sat Nov 4 17:25:51 2006
New Revision: 471315
URL: http://svn.apache.org/viewvc?view=rev&rev=471315
Log:
reformatting code
Modified:
directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationClientContextHandler.java
directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationClientProtocolHandler.java
directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationProtocolHandler.java
directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationServerProtocolHandler.java
Modified: directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationClientContextHandler.java
URL: http://svn.apache.org/viewvc/directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationClientContextHandler.java?view=diff&rev=471315&r1=471314&r2=471315
==============================================================================
--- directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationClientContextHandler.java (original)
+++ directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationClientContextHandler.java Sat Nov 4 17:25:51 2006
@@ -19,6 +19,7 @@
*/
package org.apache.directory.mitosis.service.protocol.handler;
+
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.Map;
@@ -56,6 +57,7 @@
import org.apache.directory.mitosis.store.ReplicationLogIterator;
import org.apache.directory.mitosis.store.ReplicationStore;
+
/**
* {@link ReplicationContextHandler} that implements client-side replication logic
* which sends any changes out-of-date to server.
@@ -63,43 +65,42 @@
* @author Trustin Lee
* @version $Rev: 116 $, $Date: 2006-09-18 13:47:53Z $
*/
-public class ReplicationClientContextHandler implements
- ReplicationContextHandler
+public class ReplicationClientContextHandler implements ReplicationContextHandler
{
public void contextBegin( ReplicationContext ctx ) throws Exception
{
// Send a login message.
- LoginMessage m = new LoginMessage( ctx.getNextSequence(),
- ctx.getService().getConfiguration().getReplicaId() );
+ LoginMessage m = new LoginMessage( ctx.getNextSequence(), ctx.getService().getConfiguration().getReplicaId() );
ctx.getSession().write( m );
-
+
// Set write timeout
ctx.getSession().setWriteTimeout( ctx.getConfiguration().getResponseTimeout() );
-
+
// Check update vector of the remote peer every 5 seconds.
ctx.getSession().setIdleTime( IdleStatus.BOTH_IDLE, 5 );
}
+
public void contextEnd( ReplicationContext ctx ) throws Exception
{
}
- public void messageReceived( ReplicationContext ctx, Object message )
- throws Exception
+
+ public void messageReceived( ReplicationContext ctx, Object message ) throws Exception
{
ctx.cancelExpiration( ( ( BaseMessage ) message ).getSequence() );
-
- if( ctx.getState() == State.READY )
+
+ if ( ctx.getState() == State.READY )
{
- if( message instanceof LogEntryAckMessage )
+ if ( message instanceof LogEntryAckMessage )
{
onLogEntryAck( ctx, ( LogEntryAckMessage ) message );
}
- else if( message instanceof BeginLogEntriesAckMessage )
+ else if ( message instanceof BeginLogEntriesAckMessage )
{
onBeginLogEntriesAck( ctx, ( BeginLogEntriesAckMessage ) message );
}
- else if( message instanceof EndLogEntriesAckMessage )
+ else if ( message instanceof EndLogEntriesAckMessage )
{
// Do nothing
}
@@ -110,7 +111,7 @@
}
else
{
- if( message instanceof LoginAckMessage )
+ if ( message instanceof LoginAckMessage )
{
onLoginAck( ctx, ( LoginAckMessage ) message );
}
@@ -121,103 +122,98 @@
}
}
- public void messageSent( ReplicationContext ctx, Object message )
- throws Exception
+
+ public void messageSent( ReplicationContext ctx, Object message ) throws Exception
{
- if( message instanceof LogEntryMessage ||
- message instanceof LoginMessage )
+ if ( message instanceof LogEntryMessage || message instanceof LoginMessage )
{
ctx.scheduleExpiration( message );
}
}
- public void exceptionCaught( ReplicationContext ctx, Throwable cause )
- throws Exception
+
+ public void exceptionCaught( ReplicationContext ctx, Throwable cause ) throws Exception
{
SessionLog.warn( ctx.getSession(), "Unexpected exception.", cause );
ctx.getSession().close();
}
- public void contextIdle( ReplicationContext ctx, IdleStatus status )
- throws Exception
+
+ public void contextIdle( ReplicationContext ctx, IdleStatus status ) throws Exception
{
// If this cilent is logged in, all responses for sent messages
// (LogEntryMessages) is received, and no write request is pending,
// it means previous replication process ended or this is the
// first replication attempt.
- if( ctx.getState() == State.READY &&
- ctx.getScheduledExpirations() == 0 &&
- ctx.getSession().getScheduledWriteRequests() == 0 )
+ if ( ctx.getState() == State.READY && ctx.getScheduledExpirations() == 0
+ && ctx.getSession().getScheduledWriteRequests() == 0 )
{
beginReplication( ctx );
}
}
+
private void onLoginAck( ReplicationContext ctx, LoginAckMessage message )
{
- if( message.getResponseCode() != Constants.OK )
+ if ( message.getResponseCode() != Constants.OK )
{
- SessionLog.warn( ctx.getSession(),
- "Login attempt failed: " + message.getResponseCode() );
+ SessionLog.warn( ctx.getSession(), "Login attempt failed: " + message.getResponseCode() );
ctx.getSession().close();
return;
}
-
+
Iterator i = ctx.getConfiguration().getPeerReplicas().iterator();
- while( i.hasNext() )
+ while ( i.hasNext() )
{
Replica replica = ( Replica ) i.next();
- if( replica.getId().equals( message.getReplicaId() ) )
+ if ( replica.getId().equals( message.getReplicaId() ) )
{
- if( replica.getAddress().getAddress().equals(
- ( ( InetSocketAddress ) ctx.getSession().getRemoteAddress() ).getAddress() ) )
+ if ( replica.getAddress().getAddress().equals(
+ ( ( InetSocketAddress ) ctx.getSession().getRemoteAddress() ).getAddress() ) )
{
ctx.setPeer( replica );
ctx.setState( State.READY );
-
+
beginReplication( ctx );
return;
}
else
{
- SessionLog.warn(
- ctx.getSession(),
- "Peer address mismatches: " +
- ctx.getSession().getRemoteAddress() +
- " (expected: " + replica.getAddress() );
+ SessionLog.warn( ctx.getSession(), "Peer address mismatches: "
+ + ctx.getSession().getRemoteAddress() + " (expected: " + replica.getAddress() );
ctx.getSession().close();
return;
}
}
}
-
- SessionLog.warn(
- ctx.getSession(),
- "Unknown peer replica ID: " + message.getReplicaId() );
+
+ SessionLog.warn( ctx.getSession(), "Unknown peer replica ID: " + message.getReplicaId() );
ctx.getSession().close();
}
+
private void beginReplication( ReplicationContext ctx )
{
// Initiate replication process asking update vector.
ctx.getSession().write( new BeginLogEntriesMessage( ctx.getNextSequence() ) );
}
+
private void onLogEntryAck( ReplicationContext ctx, LogEntryAckMessage message ) throws Exception
{
- if( message.getResponseCode() != Constants.OK )
+ if ( message.getResponseCode() != Constants.OK )
{
- SessionLog.warn(
- ctx.getSession(),
- "Remote peer failed to execute a log entry." );
+ SessionLog.warn( ctx.getSession(), "Remote peer failed to execute a log entry." );
ctx.getSession().close();
}
}
- private void onBeginLogEntriesAck( ReplicationContext ctx, BeginLogEntriesAckMessage message ) throws NamingException
+
+ private void onBeginLogEntriesAck( ReplicationContext ctx, BeginLogEntriesAckMessage message )
+ throws NamingException
{
// Start transaction only when the server says OK.
- if( message.getResponseCode() != Constants.OK )
+ if ( message.getResponseCode() != Constants.OK )
{
return;
}
@@ -229,60 +225,55 @@
{
myPV = store.getPurgeVector();
}
- catch( Exception e )
+ catch ( Exception e )
{
- SessionLog.warn(
- ctx.getSession(),
- "Failed to get update vector.", e );
+ SessionLog.warn( ctx.getSession(), "Failed to get update vector.", e );
ctx.getSession().close();
return;
}
-
+
// Do full-DIT transfer if the peer is new and I'm not new.
try
{
- if( myPV.size() > 0 && yourUV.size() == 0 )
+ if ( myPV.size() > 0 && yourUV.size() == 0 )
{
- SessionLog.warn(
- ctx.getSession(),
- "Starting a whole DIT transfer." );
+ SessionLog.warn( ctx.getSession(), "Starting a whole DIT transfer." );
sendAllEntries( ctx );
}
else
{
- SessionLog.warn(
- ctx.getSession(),
- "Starting a partial replication log transfer." );
+ SessionLog.warn( ctx.getSession(), "Starting a partial replication log transfer." );
sendReplicationLogs( ctx, myPV, yourUV );
}
}
finally
{
// Send EngLogEntries message to release the remote peer resources.
- ctx.getSession().write( new EndLogEntriesMessage ( ctx.getNextSequence() ) );
+ ctx.getSession().write( new EndLogEntriesMessage( ctx.getNextSequence() ) );
}
}
-
+
+
private void sendAllEntries( ReplicationContext ctx ) throws NamingException
{
Attributes rootDSE = ctx.getServiceConfiguration().getPartitionNexus().getRootDSE();
-
+
Attribute namingContextsAttr = rootDSE.get( "namingContexts" );
- if( namingContextsAttr == null || namingContextsAttr.size() == 0 )
+ if ( namingContextsAttr == null || namingContextsAttr.size() == 0 )
{
SessionLog.warn( ctx.getSession(), "No namingContexts attributes in rootDSE." );
return;
}
-
+
// Iterate all context partitions to send all entries of them.
NamingEnumeration e = namingContextsAttr.getAll();
- while( e.hasMore() )
+ while ( e.hasMore() )
{
Object value = e.next();
-
+
// Convert attribute value to JNDI name.
LdapDN contextName;
- if( value instanceof LdapDN )
+ if ( value instanceof LdapDN )
{
contextName = ( LdapDN ) value;
}
@@ -290,56 +281,55 @@
{
contextName = new LdapDN( String.valueOf( value ) );
}
-
+
SessionLog.info( ctx.getSession(), "Sending entries under '" + contextName + '\'' );
- Map mapping = ctx.getServiceConfiguration().getGlobalRegistries()
- .getAttributeTypeRegistry().getNormalizerMapping();
+ Map mapping = ctx.getServiceConfiguration().getGlobalRegistries().getAttributeTypeRegistry()
+ .getNormalizerMapping();
contextName.normalize( mapping );
sendAllEntries( ctx, contextName );
}
}
+
private void sendAllEntries( ReplicationContext ctx, LdapDN contextName ) throws NamingException
{
// Retrieve all subtree including the base entry
SearchControls ctrl = new SearchControls();
- ctrl.setSearchScope( SearchControls.SUBTREE_SCOPE );
- NamingEnumeration e = ctx.getServiceConfiguration().getPartitionNexus().search(
- contextName,
- ctx.getServiceConfiguration().getEnvironment(),
- new PresenceNode( org.apache.directory.mitosis.common.Constants.OBJECT_CLASS_OID ), ctrl );
-
+ ctrl.setSearchScope( SearchControls.SUBTREE_SCOPE );
+ NamingEnumeration e = ctx.getServiceConfiguration().getPartitionNexus().search( contextName,
+ ctx.getServiceConfiguration().getEnvironment(),
+ new PresenceNode( org.apache.directory.mitosis.common.Constants.OBJECT_CLASS_OID ), ctrl );
+
try
{
- while( e.hasMore() )
+ while ( e.hasMore() )
{
SearchResult sr = ( SearchResult ) e.next();
Attributes attrs = sr.getAttributes();
-
+
// Skip entries without entryCSN attribute.
Attribute entryCSNAttr = attrs.get( org.apache.directory.mitosis.common.Constants.ENTRY_CSN );
- if( entryCSNAttr == null )
+ if ( entryCSNAttr == null )
{
continue;
}
-
+
// Get entryCSN of the entry. Skip if entryCSN value is invalid.
CSN csn = null;
try
{
csn = new SimpleCSN( String.valueOf( entryCSNAttr.get() ) );
}
- catch( IllegalArgumentException ex )
+ catch ( IllegalArgumentException ex )
{
- SessionLog.warn( ctx.getSession(),
- "An entry with improper entryCSN: " + sr.getName() );
+ SessionLog.warn( ctx.getSession(), "An entry with improper entryCSN: " + sr.getName() );
continue;
}
-
+
// Convert the entry into AddEntryOperation log.
Operation op = new AddEntryOperation( csn, new LdapDN( sr.getName() ), attrs );
-
+
// Send a LogEntry message for the entry.
ctx.getSession().write( new LogEntryMessage( ctx.getNextSequence(), op ) );
}
@@ -350,28 +340,28 @@
}
}
+
private void sendReplicationLogs( ReplicationContext ctx, CSNVector myPV, CSNVector yourUV )
{
Iterator i = myPV.getReplicaIds().iterator();
- while( i.hasNext() )
+ while ( i.hasNext() )
{
ReplicaId replicaId = ( ReplicaId ) i.next();
CSN myCSN = myPV.getCSN( replicaId );
CSN yourCSN = yourUV.getCSN( replicaId );
- if( yourCSN != null && ( myCSN == null || yourCSN.compareTo( myCSN ) < 0 ) )
+ if ( yourCSN != null && ( myCSN == null || yourCSN.compareTo( myCSN ) < 0 ) )
{
- SessionLog.warn(
- ctx.getSession(),
- "Remote update vector (" + yourUV + ") is out-of-date. Full replication is required." );
+ SessionLog.warn( ctx.getSession(), "Remote update vector (" + yourUV
+ + ") is out-of-date. Full replication is required." );
ctx.getSession().close();
return;
}
}
-
+
ReplicationLogIterator logIt = ctx.getConfiguration().getStore().getLogs( yourUV, false );
try
{
- while( logIt.next() )
+ while ( logIt.next() )
{
Operation op = logIt.getOperation();
ctx.getSession().write( new LogEntryMessage( ctx.getNextSequence(), op ) );
@@ -382,12 +372,11 @@
logIt.close();
}
}
-
+
+
private void onUnexpectedMessage( ReplicationContext ctx, Object message )
{
- SessionLog.warn(
- ctx.getSession(),
- "Unexpected message: " + message );
+ SessionLog.warn( ctx.getSession(), "Unexpected message: " + message );
ctx.getSession().close();
}
}
Modified: directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationClientProtocolHandler.java
URL: http://svn.apache.org/viewvc/directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationClientProtocolHandler.java?view=diff&rev=471315&r1=471314&r2=471315
==============================================================================
--- directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationClientProtocolHandler.java (original)
+++ directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationClientProtocolHandler.java Sat Nov 4 17:25:51 2006
@@ -19,10 +19,11 @@
*/
package org.apache.directory.mitosis.service.protocol.handler;
+
import org.apache.directory.mitosis.service.ReplicationService;
-public class ReplicationClientProtocolHandler extends
- ReplicationProtocolHandler
+
+public class ReplicationClientProtocolHandler extends ReplicationProtocolHandler
{
public ReplicationClientProtocolHandler( ReplicationService service )
{
Modified: directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationProtocolHandler.java
URL: http://svn.apache.org/viewvc/directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationProtocolHandler.java?view=diff&rev=471315&r1=471314&r2=471315
==============================================================================
--- directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationProtocolHandler.java (original)
+++ directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationProtocolHandler.java Sat Nov 4 17:25:51 2006
@@ -19,6 +19,7 @@
*/
package org.apache.directory.mitosis.service.protocol.handler;
+
import org.apache.directory.mitosis.configuration.ReplicationConfiguration;
import org.apache.directory.mitosis.service.ReplicationContext;
import org.apache.directory.mitosis.service.ReplicationService;
@@ -28,19 +29,18 @@
import org.apache.mina.common.IoHandler;
import org.apache.mina.common.IoSession;
+
public class ReplicationProtocolHandler implements IoHandler
{
private static final String CONTEXT = "context";
-
+
private final ReplicationService service;
private final ReplicationConfiguration configuration;
private final DirectoryServiceConfiguration serviceCfg;
private final ReplicationContextHandler contextHandler;
- public ReplicationProtocolHandler(
- ReplicationService service,
- ReplicationContextHandler contextHandler )
+ public ReplicationProtocolHandler( ReplicationService service, ReplicationContextHandler contextHandler )
{
assert service != null;
assert contextHandler != null;
@@ -50,32 +50,38 @@
this.serviceCfg = service.getFactoryConfiguration();
this.contextHandler = contextHandler;
}
-
+
+
private ReplicationContext getContext( IoSession session )
{
return ( ReplicationContext ) session.getAttribute( CONTEXT );
}
-
+
+
public void sessionCreated( IoSession session ) throws Exception
{
session.setAttribute( CONTEXT, new SimpleReplicationContext( service, serviceCfg, configuration, session ) );
}
+
public void exceptionCaught( IoSession session, Throwable cause ) throws Exception
{
contextHandler.exceptionCaught( getContext( session ), cause );
}
+
public void messageReceived( IoSession session, Object message ) throws Exception
{
contextHandler.messageReceived( getContext( session ), message );
}
+
public void messageSent( IoSession session, Object message ) throws Exception
{
contextHandler.messageSent( getContext( session ), message );
}
+
public void sessionClosed( IoSession session ) throws Exception
{
ReplicationContext ctx = getContext( session );
@@ -83,10 +89,12 @@
ctx.cancelAllExpirations();
}
+
public void sessionIdle( IoSession session, IdleStatus status ) throws Exception
{
contextHandler.contextIdle( getContext( session ), status );
}
+
public void sessionOpened( IoSession session ) throws Exception
{
Modified: directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationServerProtocolHandler.java
URL: http://svn.apache.org/viewvc/directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationServerProtocolHandler.java?view=diff&rev=471315&r1=471314&r2=471315
==============================================================================
--- directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationServerProtocolHandler.java (original)
+++ directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationServerProtocolHandler.java Sat Nov 4 17:25:51 2006
@@ -19,10 +19,11 @@
*/
package org.apache.directory.mitosis.service.protocol.handler;
+
import org.apache.directory.mitosis.service.ReplicationService;
-public class ReplicationServerProtocolHandler extends
- ReplicationProtocolHandler
+
+public class ReplicationServerProtocolHandler extends ReplicationProtocolHandler
{
public ReplicationServerProtocolHandler( ReplicationService service )
{