You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/06/09 00:36:23 UTC

svn commit: r412871 - in /tomcat/container/tc5.5.x/modules/groupcom: doc/ src/share/org/apache/catalina/tribes/group/ src/share/org/apache/catalina/tribes/group/interceptors/ src/share/org/apache/catalina/tribes/membership/ src/share/org/apache/catalin...

Author: fhanik
Date: Thu Jun  8 15:36:22 2006
New Revision: 412871

URL: http://svn.apache.org/viewvc?rev=412871&view=rev
Log:
Almost there, debugging the code and the algorithm left

Added:
    tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/interceptors/TestNonBlockingCoordinator.java
Modified:
    tomcat/container/tc5.5.x/modules/groupcom/doc/leader-election-message-arrives.dia
    tomcat/container/tc5.5.x/modules/groupcom/doc/leader-election-message-arrives.jpg
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/Membership.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/AbstractSender.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/PooledParallelSender.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java

Modified: tomcat/container/tc5.5.x/modules/groupcom/doc/leader-election-message-arrives.dia
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/doc/leader-election-message-arrives.dia?rev=412871&r1=412870&r2=412871&view=diff
==============================================================================
Binary files - no diff available.

Modified: tomcat/container/tc5.5.x/modules/groupcom/doc/leader-election-message-arrives.jpg
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/doc/leader-election-message-arrives.jpg?rev=412871&r1=412870&r2=412871&view=diff
==============================================================================
Binary files - no diff available.

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java?rev=412871&r1=412870&r2=412871&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java Thu Jun  8 15:36:22 2006
@@ -59,7 +59,7 @@
      * If <code>heartbeat == true</code> then how often do we want this
      * heartbeat to run. default is one minute
      */
-    protected long heartbeatSleeptime = 60*1000;//only run once a minute
+    protected long heartbeatSleeptime = 5*1000;//every 5 seconds
 
     /**
      * Internal heartbeat thread

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java?rev=412871&r1=412870&r2=412871&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java Thu Jun  8 15:36:22 2006
@@ -185,10 +185,15 @@
     
     public void startElection(boolean force) throws ChannelException {
         synchronized (electionMutex) {
-            if ( suggestedviewId != null ) return;//election already running, I'm not allowed to have two of them
             MemberImpl local = (MemberImpl)getLocalMember(false);
             MemberImpl[] others = (MemberImpl[])membership.getMembers();
-            if ( others.length == 0 ) return; //the only member, no need for an election
+            if ( others.length == 0 ) {
+                this.viewId = new UniqueId(UUIDGenerator.randomUUID(false));
+                this.view = new Membership(local,AbsoluteOrder.comp, true);
+                this.handleViewConf(this.createElectionMsg(local,others,local),local,view);
+                return; //the only member, no need for an election
+            }
+            if ( suggestedviewId != null ) return;//election already running, I'm not allowed to have two of them
             int prio = AbsoluteOrder.comp.compare(local,others[0]);
             MemberImpl leader = ( prio < 0 )?local:others[0];//am I the leader in my view?
             if ( local.equals(leader) || force ) {
@@ -225,9 +230,9 @@
         super.sendMessage(new Member[] {next}, createData(msg, local), null);
     }
     
-    protected void sendElectionMsgToNextInline(MemberImpl local, CoordinationMessage msg) { 
+    protected void sendElectionMsgToNextInline(MemberImpl local, CoordinationMessage msg) throws ChannelException { 
         int next = Arrays.nextIndex(local,msg.getMembers());
-        if ( AbsoluteOrder.comp.compare(local,msg.getLeader()) > 0 ) msg.leader = local;
+        msg.leader = msg.getMembers()[0];
         if ( next >= 0 ) sendElectionMsg(local,(MemberImpl)msg.getMembers()[next],msg);
     }
     
@@ -305,15 +310,29 @@
                 handleViewConf(msg,local,merged);
             } else {
                 //membership change
-                
+                suggestedView = new Membership(local,AbsoluteOrder.comp,true);
+                suggestedviewId = msg.getId();
+                Arrays.fill(suggestedView,merged.getMembers());
+                msg.view = (MemberImpl[])merged.getMembers();
+                sendElectionMsgToNextInline(local,msg);
             }
         } else {
             //leadership change
+            suggestedView = null;
+            suggestedviewId = null;
+            msg.view = (MemberImpl[])merged.getMembers();
+            sendElectionMsgToNextInline(local,msg);
         }
     }
     
     protected void handleOtherToken(MemberImpl local, CoordinationMessage msg, Member sender,Membership merged) throws ChannelException {
-        
+        if ( local.equals(msg.getLeader()) ) {
+            //I am the new leader
+            startElection(false);
+        } else {
+            msg.view = (MemberImpl[])merged.getMembers();
+            sendElectionMsgToNextInline(local,msg);
+        }
     }
     
     protected void handleViewConf(CoordinationMessage msg, Member sender,Membership merged) throws ChannelException {
@@ -326,6 +345,12 @@
             suggestedView = null;
             suggestedviewId = null;
         }
+        
+        if (suggestedView != null && AbsoluteOrder.comp.compare(suggestedView.getMembers()[0],merged.getMembers()[0])<0 ) {
+            suggestedView = null;
+            suggestedviewId = null;
+        }
+        
         viewChange(viewId,view.getMembers());
         
         if ( suggestedviewId == null && hasHigherPriority(merged.getMembers(),membership.getMembers()) ) {
@@ -381,9 +406,9 @@
 //              OVERRIDDEN METHODS FROM CHANNEL INTERCEPTOR BASE    
 //============================================================================================================
     public void start(int svc) throws ChannelException {
-        if ( membership == null ) setupMembership();
         if (started)return;
         super.start(startsvc);
+        if ( membership == null ) setupMembership();
         startElection(false);
         started = true;
     }
@@ -392,8 +417,8 @@
         try {
             halt();
             if ( !started ) return;
-            super.stop(startsvc);
             started = false;
+            super.stop(startsvc);
         }finally {
             release();
         }
@@ -429,7 +454,7 @@
 
     public void memberAdded(Member member,boolean elect) {
         try {
-            
+            if ( membership == null ) setupMembership();
             if ( membership.memberAlive((MemberImpl)member) ) super.memberAdded(member);
             try {
                 if (started && elect) startElection(false);

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/Membership.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/Membership.java?rev=412871&r1=412870&r2=412871&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/Membership.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/Membership.java Thu Jun  8 15:36:22 2006
@@ -77,7 +77,7 @@
      */
     public Membership(MemberImpl local, boolean includeLocal) {
         this.local = local;
-        this.addMember(local);
+        if ( includeLocal ) addMember(local);
     }
 
     public Membership(MemberImpl local) {

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/AbstractSender.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/AbstractSender.java?rev=412871&r1=412870&r2=412871&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/AbstractSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/AbstractSender.java Thu Jun  8 15:36:22 2006
@@ -54,7 +54,7 @@
     private boolean soKeepAlive = false;
     private boolean ooBInline = true;
     private boolean soReuseAddress = true;
-    private boolean soLingerOn = true;
+    private boolean soLingerOn = false;
     private int soLingerTime = 3;
     private int soTrafficClass = 0x04 | 0x08 | 0x010;
     private boolean throwOnFailedAck = false;

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java?rev=412871&r1=412870&r2=412871&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java Thu Jun  8 15:36:22 2006
@@ -27,6 +27,7 @@
 import org.apache.catalina.tribes.io.ListenCallback;
 import org.apache.catalina.tribes.io.ChannelData;
 import org.apache.catalina.tribes.io.BufferPool;
+import java.nio.channels.CancelledKeyException;
 
 /**
  * A worker thread class which can drain channels and echo-back the input. Each
@@ -79,7 +80,9 @@
             } catch (Exception e) {
                 //this is common, since the sockets on the other
                 //end expire after a certain time.
-                if ( e instanceof IOException ) {
+                if ( e instanceof CancelledKeyException ) {
+                    //do nothing
+                } else if ( e instanceof IOException ) {
                     //dont spew out stack traces for IO exceptions unless debug is enabled.
                     if (log.isDebugEnabled()) log.debug ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed.", e);
                     else log.warn ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed.");
@@ -187,11 +190,15 @@
         //acquire the interestOps mutex
         Object mutex = this.getPool().getInterestOpsMutex();
         synchronized (mutex) {
-            // cycle the selector so this key is active again
-            key.selector().wakeup();
-            // resume interest in OP_READ, OP_WRITE
-            int resumeOps = key.interestOps() | SelectionKey.OP_READ;
-            key.interestOps(resumeOps);
+            try {
+                // cycle the selector so this key is active again
+                key.selector().wakeup();
+                // resume interest in OP_READ, OP_WRITE
+                int resumeOps = key.interestOps() | SelectionKey.OP_READ;
+                key.interestOps(resumeOps);
+            }catch ( Exception x ) {
+                log.error("Unable to cycle the selector, connection disconnected?",x);
+            }
         }
         
     }

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/PooledParallelSender.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/PooledParallelSender.java?rev=412871&r1=412870&r2=412871&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/PooledParallelSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/PooledParallelSender.java Thu Jun  8 15:36:22 2006
@@ -37,7 +37,7 @@
  * @version 1.0
  */
 public class PooledParallelSender extends PooledSender implements MultiPointSender {
-    protected boolean connected = false;
+    protected boolean connected = true;
     public PooledParallelSender() {
         super();
     }

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java?rev=412871&r1=412870&r2=412871&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java Thu Jun  8 15:36:22 2006
@@ -34,9 +34,9 @@
     public static boolean contains(byte[] source, int srcoffset, byte[] key, int keyoffset, int length) {
         if ( srcoffset < 0 || srcoffset >= source.length) throw new ArrayIndexOutOfBoundsException("srcoffset is out of bounds.");
         if ( keyoffset < 0 || keyoffset >= key.length) throw new ArrayIndexOutOfBoundsException("keyoffset is out of bounds.");
-        if ( length >= (key.length-keyoffset) ) throw new ArrayIndexOutOfBoundsException("not enough data elements in the key, length is out of bounds.");
+        if ( length > (key.length-keyoffset) ) throw new ArrayIndexOutOfBoundsException("not enough data elements in the key, length is out of bounds.");
         //we don't have enough data to validate it
-        if ( length >= (source.length-srcoffset) ) return false;
+        if ( length > (source.length-srcoffset) ) return false;
         boolean match = true;
         int pos = keyoffset;
         for ( int i=srcoffset; match && i<length; i++ ) {
@@ -116,7 +116,8 @@
     }
     
     public static Member[] extract(Member[] all, Member[] remove) {
-        List list = java.util.Arrays.asList(all);
+        List alist = java.util.Arrays.asList(all);
+        ArrayList list = new ArrayList(alist);
         for (int i=0; i<remove.length; i++ ) list.remove(remove[i]);
         return (Member[])list.toArray(new Member[list.size()]);
     }

Added: tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/interceptors/TestNonBlockingCoordinator.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/interceptors/TestNonBlockingCoordinator.java?rev=412871&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/interceptors/TestNonBlockingCoordinator.java (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/interceptors/TestNonBlockingCoordinator.java Thu Jun  8 15:36:22 2006
@@ -0,0 +1,82 @@
+package org.apache.catalina.tribes.test.interceptors;
+
+import org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator;
+import junit.framework.TestCase;
+import org.apache.catalina.tribes.group.GroupChannel;
+import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.Member;
+import junit.framework.TestSuite;
+import junit.framework.TestResult;
+
+public class TestNonBlockingCoordinator extends TestCase {
+
+    GroupChannel[] channels = null;
+    NonBlockingCoordinator[] coordinators = null;
+    int channelCount = 6;
+    Thread[] threads = null;
+    protected void setUp() throws Exception {
+        super.setUp();
+        channels = new GroupChannel[channelCount];
+        coordinators = new NonBlockingCoordinator[channelCount];
+        threads = new Thread[channelCount];
+        for ( int i=0; i<channelCount; i++ ) {
+            channels[i] = new GroupChannel();
+            coordinators[i] = new NonBlockingCoordinator();
+            channels[i].addInterceptor(coordinators[i]);
+            channels[i].addInterceptor(new TcpFailureDetector());
+            final int j = i;
+            threads[i] = new Thread() {
+                public void run() {
+                    try {
+                        channels[j].start(Channel.DEFAULT);
+                        Thread.sleep(50);
+                    } catch (Exception x) {
+                        x.printStackTrace();
+                    }
+                }
+            };
+        }
+        for ( int i=0; i<channelCount; i++ ) threads[i].start();
+        for ( int i=0; i<channelCount; i++ ) threads[i].join();
+    }
+    
+    public void testCoord1() throws Exception {
+        for (int i=1; i<channelCount; i++ ) 
+            assertEquals("Message count expected to be equal.",channels[i-1].getMembers().length,channels[i].getMembers().length);
+
+        Member member = coordinators[0].getCoordinator();
+        int cnt = 0;
+        while ( member == null && (cnt++ < 100 ) ) try {Thread.sleep(100); member = coordinators[0].getCoordinator();}catch ( Exception x){}
+        for (int i=0; i<channelCount; i++ ) super.assertEquals(member,coordinators[i].getCoordinator());
+        System.out.println("Coordinator[1] is:"+member);
+    }
+    
+    public void testCoord2() throws Exception {
+        channels[0].stop(Channel.DEFAULT);
+        if ( channelCount > 3 ) channels[channelCount-1].start(Channel.DEFAULT);
+        Thread.sleep(1000);
+        System.out.println("Member count:"+channels[1].getMembers().length);
+        Member member = coordinators[1].getCoordinator();
+        for (int i = 1; i < channelCount; i++)super.assertEquals(member, coordinators[i].getCoordinator());
+        Thread.sleep(3000);
+        System.out.println("Coordinator[2] is:" + member);
+
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        for ( int i=0; i<channelCount; i++ ) {
+            channels[i].stop(Channel.DEFAULT);
+        }
+    }
+    
+    public static void main(String[] args) throws Exception {
+        TestSuite suite = new TestSuite();
+        suite.addTestSuite(TestNonBlockingCoordinator.class);
+        suite.run(new TestResult());
+    }
+    
+    
+
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org