You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by ma...@apache.org on 2007/08/01 22:52:37 UTC

svn commit: r561945 - /directory/apacheds/trunk/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationClientContextHandler.java

Author: malderson
Date: Wed Aug  1 13:52:34 2007
New Revision: 561945

URL: http://svn.apache.org/viewvc?view=rev&rev=561945
Log:
Fix for DIRSERVER-998, where replication messages were sometimes timing out as we tried to remove the timeout timer before it had even been set.

Modified:
    directory/apacheds/trunk/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationClientContextHandler.java

Modified: directory/apacheds/trunk/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationClientContextHandler.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationClientContextHandler.java?view=diff&rev=561945&r1=561944&r2=561945
==============================================================================
--- directory/apacheds/trunk/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationClientContextHandler.java (original)
+++ directory/apacheds/trunk/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationClientContextHandler.java Wed Aug  1 13:52:34 2007
@@ -58,6 +58,7 @@
 import org.apache.directory.shared.ldap.filter.PresenceNode;
 import org.apache.directory.shared.ldap.name.LdapDN;
 import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.WriteFuture;
 import org.apache.mina.util.SessionLog;
 
 
@@ -122,7 +123,7 @@
     {
         // Send a login message.
         LoginMessage m = new LoginMessage( ctx.getNextSequence(), ctx.getService().getConfiguration().getReplicaId() );
-        ctx.getSession().write( m );
+        writeTimeLimitedMessage( ctx, m );
 
         // Set write timeout
         ctx.getSession().setWriteTimeout( ctx.getConfiguration().getResponseTimeout() );
@@ -178,10 +179,20 @@
 
     public void messageSent( ReplicationContext ctx, Object message ) throws Exception
     {
-        if ( message instanceof LogEntryMessage || message instanceof LoginMessage )
-        {
-            ctx.scheduleExpiration( message );
-        }
+    }
+    
+    
+    /**
+     * A helper to write a message and schedule that message for expiration.
+     *
+     * @param ctx
+     * @param message
+     * @return
+     */
+    public WriteFuture writeTimeLimitedMessage( ReplicationContext ctx, Object message )
+    {
+        ctx.scheduleExpiration( message );
+        return ctx.getSession().write( message );
     }
 
 
@@ -245,11 +256,23 @@
             && ctx.getSession().getScheduledWriteRequests() == 0 )
         {
         	// Initiate replication process asking update vector.
+            if ( SessionLog.isDebugEnabled( ctx.getSession() ) ) {
+                SessionLog.debug( ctx.getSession(), "(" +
+                    ctx.getConfiguration().getReplicaId().getId() + "->" +
+                    (ctx.getPeer() != null ? ctx.getPeer().getId().getId() : "null") +
+                    ") Beginning replication. " );
+            }
         	ctx.getSession().write( new BeginLogEntriesMessage( ctx.getNextSequence() ) );
         	return true;
         }
         else
         {
+            if ( SessionLog.isDebugEnabled( ctx.getSession() ) ) {
+                SessionLog.debug( ctx.getSession(), "(" +
+                    ctx.getConfiguration().getReplicaId().getId() + "->" +
+                    (ctx.getPeer() != null ? ctx.getPeer().getId().getId() : "null") +
+                    ") Couldn't begin replication.  State:" + ctx.getState() + ", scheduledExpirations:" + ctx.getScheduledExpirations() + ", scheduledWriteRequests:" + ctx.getSession().getScheduledWriteRequests() );
+            }
         	return false;
         }
     }
@@ -391,7 +414,7 @@
                 Operation op = new AddEntryOperation( csn, dn, attrs );
 
                 // Send a LogEntry message for the entry.
-                ctx.getSession().write( new LogEntryMessage( ctx.getNextSequence(), op ) );
+                writeTimeLimitedMessage( ctx, new LogEntryMessage( ctx.getNextSequence(), op ) );
             }
         }
         finally
@@ -425,7 +448,7 @@
             while ( logIt.next() )
             {
                 Operation op = logIt.getOperation();
-                ctx.getSession().write( new LogEntryMessage( ctx.getNextSequence(), op ) );
+                writeTimeLimitedMessage( ctx, new LogEntryMessage( ctx.getNextSequence(), op ) );
             }
         }
         finally