You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/05/23 22:23:51 UTC

svn commit: r409000 - in /tomcat/container/tc5.5.x/modules/groupcom: doc/ src/share/org/apache/catalina/tribes/group/interceptors/ src/share/org/apache/catalina/tribes/util/

Author: fhanik
Date: Tue May 23 13:23:49 2006
New Revision: 409000

URL: http://svn.apache.org/viewvc?rev=409000&view=rev
Log:
Slowly implementing to match the state diagram

Modified:
    tomcat/container/tc5.5.x/modules/groupcom/doc/leader-election-message-arrives.dia
    tomcat/container/tc5.5.x/modules/groupcom/doc/leader-election-message-arrives.jpg
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java

Modified: tomcat/container/tc5.5.x/modules/groupcom/doc/leader-election-message-arrives.dia
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/doc/leader-election-message-arrives.dia?rev=409000&r1=408999&r2=409000&view=diff
==============================================================================
Binary files - no diff available.

Modified: tomcat/container/tc5.5.x/modules/groupcom/doc/leader-election-message-arrives.jpg
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/doc/leader-election-message-arrives.jpg?rev=409000&r1=408999&r2=409000&view=diff
==============================================================================
Binary files - no diff available.

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java?rev=409000&r1=408999&r2=409000&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java Tue May 23 13:23:49 2006
@@ -30,6 +30,7 @@
 import org.apache.catalina.tribes.Channel;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
+import org.apache.catalina.tribes.membership.*;
 
 /**
  * <p>Title: NonBlockingCoordinator</p>
@@ -132,6 +133,17 @@
     protected static final byte[] COORD_CONF = new byte[] {67, 88, 107, -86, 69, 23, 76, -70, -91, -23, -87, -25, -125, 86, 75, 20};
     
     /**
+     * Alive message
+     */
+    protected static final byte[] COORD_ALIVE = new byte[] {79, -121, -25, -15, -59, 5, 64, 94, -77, 113, -119, -88, 52, 114, -56, -46,
+                                                            -18, 102, 10, 34, -127, -9, 71, 115, -70, 72, -101, 88, 72, -124, 127, 111,
+                                                            74, 76, -116, 50, 111, 103, 65, 3, -77, 51, -35, 0, 119, 117, 9, -26,
+                                                            119, 50, -75, -105, -102, 36, 79, 37, -68, -84, -123, 15, -22, -109, 106, -55};
+    /**
+     * Time to wait for coordination timeout
+     */
+    protected long waitForCoordMsgTimeout = 5000;
+    /**
      * Our current view
      */
     protected Membership view = null;
@@ -163,17 +175,11 @@
     }
     
     public void start(int svc) throws ChannelException {
-        try {
-            halt();
-            if ( started ) return;
-            super.start(startsvc);
-            started = true;
-            
-        }finally {
-            release();
-        }
-        //coordination can happen before this line of code executes
-        Member local = getLocalMember(false);
+        if ( membership == null ) setupMembership();
+        if (started)return;
+        super.start(startsvc);
+        startElection(false);
+        started = true;
     }
     
     public void stop(int svc) throws ChannelException {
@@ -191,78 +197,82 @@
         return (Membership)rotatingViews.get(id);
     }
     
-    public void elect() {
+    public void startElection(boolean force) throws ChannelException {
         synchronized (electionMutex) {
-            try {
-                Member[] mbrs = super.getMembers();
-                //no members, exit
-                if ( mbrs.length == 0 ) return;
-                AbsoluteOrder.absoluteOrder(mbrs);
-                MemberImpl local = (MemberImpl)getLocalMember(false);
-                //I'm not the higest, exit
-                if ( !local.equals(mbrs[0]) ) return;
-                //I'm already running an election
-                if ( suggestedviewId != null ) return;
-                //create a suggestedview
-                suggestedviewId = new UniqueId(UUIDGenerator.randomUUID(true));
-                Membership suggestedview = new Membership((MemberImpl)local,AbsoluteOrder.comp);
-                rotatingViews.put(suggestedviewId,suggestedview);
-                suggestedview.addMember((MemberImpl)local);
-                Arrays.fill(suggestedview,mbrs);
-                suggestedviewId = new UniqueId(UUIDGenerator.randomUUID(true));
-                CoordinationMessage msg = new CoordinationMessage(local,local,suggestedview.getMembers(),suggestedviewId,COORD_REQUEST);
-                for (int i=0; i<mbrs.length; i++ ) {
-                    try {
-                        sendMessage(msg,mbrs[i]);
-                        break;
-                    } catch ( ChannelException x ) {
-                        log.error("Unable to send election message, trying next node.",x);
-                    }
+            if ( suggestedviewId != null ) return;//election already running, I'm not allowed to have two of them
+            MemberImpl local = (MemberImpl)getLocalMember(false);
+            MemberImpl[] others = (MemberImpl[])membership.getMembers();
+            if ( others.length == 0 ) return; //the only member, no need for an election
+            int prio = AbsoluteOrder.comp.compare(local,others[0]);
+            MemberImpl leader = ( prio < 0 )?local:others[0];
+            if ( local.equals(leader) || force ) sendElectionMsg(local,leader,others);
+            else {
+                try {
+                    electionMutex.wait(waitForCoordMsgTimeout);
+                }catch ( InterruptedException x ) {
+                    Thread.currentThread().interrupted();
                 }
-                halt();
-            } finally {
-                //dont release, election running
-                //release will happen on processCoordMessage
-            }
+                if ( rotatingViews.size() == 0 ) {
+                    //no message arrived, send the coord msg
+                    startElection(true);
+                }
+            }//end if
+        }
+    }
+
+    protected void sendElectionMsg(MemberImpl local, MemberImpl leader, MemberImpl[] others) throws ChannelException {
+        synchronized (electionMutex) {
+            if ( suggestedviewId != null ) return;//election already running, I'm not allowed to have two of them
+            Membership m = new Membership(local,AbsoluteOrder.comp);
+            m.addMember(local);
+            Arrays.fill(m,others);
+            MemberImpl[] mbrs = m.getMembers();
+            CoordinationMessage msg = new CoordinationMessage(leader, local, mbrs,new UniqueId(UUIDGenerator.randomUUID(true)), this.COORD_REQUEST);
+            suggestedviewId = msg.getId();
+            rotatingViews.put(suggestedviewId, msg);
+            super.sendMessage(new Member[] {others[0]}, createData(msg, local), null);
         }
     }
     
+    public ChannelData createData(CoordinationMessage msg, MemberImpl local) {
+        ChannelData data = new ChannelData(true);
+        data.setAddress(local);
+        data.setMessage(msg.getBuffer());
+        data.setOptions(Channel.SEND_OPTIONS_USE_ACK);
+        data.setTimestamp(System.currentTimeMillis());
+        return data;
+    }
+    
     protected void viewChange(UniqueId viewId, Member[] view) {
         //invoke any listeners
     }
     
+    protected boolean alive(Member mbr) {
+        return TcpFailureDetector.memberAlive(mbr,
+                                              COORD_ALIVE,
+                                              false,
+                                              false,
+                                              waitForCoordMsgTimeout,
+                                              waitForCoordMsgTimeout,
+                                              getOptionFlag());
+    }
+    
+    protected Membership mergeOnArrive(CoordinationMessage msg, Member sender) {
+        
+        MemberImpl local = (MemberImpl)getLocalMember(false);
+        Membership merged = new Membership(local,AbsoluteOrder.comp);
+        Arrays.fill(merged,msg.getMembers());
+        Arrays.fill(merged,getMembers());
+        Member[] diff = Arrays.diff(merged,membership,local);
+        for ( int i=0; i<diff.length; i++ ) {
+            if (!alive(diff[i])) merged.removeMember((MemberImpl)diff[i]);
+        }
+        return merged;
+    }
+    
     protected void processCoordMessage(CoordinationMessage msg, Member sender) {
         synchronized (electionMutex) {
-            MemberImpl local = (MemberImpl) getLocalMember(false);
-            if (suggestedviewId != null) {
-                //we are running our own election
-                if (suggestedviewId.equals(msg.getId())) {
-                    //we received our own token
-                    Membership suggestedview = getView(msg.getId());
-                    Member[] suggested = suggestedview.getMembers();
-                    Member[] received = msg.getMembers();
-                    if (Arrays.sameMembers(suggested,received) ) {
-                        //we completed the loop
-                        view = suggestedview;
-                        viewId = suggestedviewId;
-                        suggestedviewId = null;
-                        rotatingViews.remove(viewId);
-                        suggestedview.reset();
-                        viewChange(viewId,view.getMembers());
-                        release();
-                    } else  {  
-                        //view or leadership changed
-                        if ( !local.equals(msg.getLeader()) ) {
-                            //leadership changed
-                        } else {
-                            //leadership didn't change
-                            //circulate it again
-                        }
-                    }
-                }
-            } else {
-
-            }
+            msg.timestamp = System.currentTimeMillis();
         }
     }
 
@@ -289,7 +299,9 @@
     }
 
     public void messageReceived(ChannelMessage msg) {
-        if ( Arrays.contains(msg.getMessage().getBytesDirect(),0,COORD_HEADER,0,COORD_HEADER.length) ) {
+        if ( Arrays.contains(msg.getMessage().getBytesDirect(),0,COORD_ALIVE,0,COORD_ALIVE.length) ) {
+            //ignore message, its an alive message
+        } else if ( Arrays.contains(msg.getMessage().getBytesDirect(),0,COORD_HEADER,0,COORD_HEADER.length) ) {
             processCoordMessage(new CoordinationMessage(msg.getMessage()),msg.getAddress());
         } else {
             super.messageReceived(msg);
@@ -302,23 +314,31 @@
 
     public void memberAdded(Member member) {
         try {
-            if ( membership == null ) setupMembership();
+            
             if ( membership.memberAlive((MemberImpl)member) ) super.memberAdded(member);
-            halt();
+            try {
+                if (started) startElection(false);
+            }catch ( ChannelException x ) {
+                log.error("Unable to start election when member was added.",x);
+            }
         }finally {
-            release();
         }
         
     }
 
     public void memberDisappeared(Member member) {
         try {
-            halt();
-            if ( started ) elect();
+            
+            membership.removeMember((MemberImpl)member);
+            super.memberDisappeared(member);
+            try {
+                if (started) startElection(false);
+            }catch ( ChannelException x ) {
+                log.error("Unable to start election when member was removed.",x);
+            }
         }finally {
-            release();
         }
-        super.memberDisappeared(member);
+        
     }
 
     public void heartbeat() {
@@ -329,7 +349,7 @@
      * has members
      */
     public boolean hasMembers() {
-        if ( membership == null ) setupMembership();
+        
         return membership.hasMembers();
     }
 
@@ -338,8 +358,8 @@
      * @return all members or empty array
      */
     public Member[] getMembers() {
-        if ( membership == null ) setupMembership();
-        throw new UnsupportedOperationException("Not yet implemented");
+        
+        return membership.getMembers();
     }
 
     /**
@@ -348,8 +368,8 @@
      * @return Member
      */
     public Member getMember(Member mbr) {
-        if ( membership == null ) setupMembership();
-        throw new UnsupportedOperationException("Not yet implemented");
+        
+        return membership.getMember(mbr);
     }
 
     /**

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java?rev=409000&r1=408999&r2=409000&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java Tue May 23 13:23:49 2006
@@ -58,7 +58,7 @@
     
     private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog( TcpFailureDetector.class );
     
-    protected static byte[] testMessage = new byte[] {
+    protected static byte[] TCP_FAIL_DETECT = new byte[] {
         79, -89, 115, 72, 121, -126, 67, -55, -97, 111, -119, -128, -95, 91, 7, 20,
         125, -39, 82, 91, -21, -15, 67, -102, -73, 126, -66, -113, -127, 103, 30, -74,
         55, 21, -66, -121, 69, 126, 76, -88, -65, 10, 77, 19, 83, 56, 21, 50,
@@ -100,8 +100,8 @@
         boolean process = true;
         if ( okToProcess(msg.getOptions()) ) {
             //check to see if it is a testMessage, if so, process = false
-            process = ( (msg.getMessage().getLength() != testMessage.length) ||
-                        (!Arrays.equals(testMessage,msg.getMessage().getBytes()) ) );
+            process = ( (msg.getMessage().getLength() != TCP_FAIL_DETECT.length) ||
+                        (!Arrays.equals(TCP_FAIL_DETECT,msg.getMessage().getBytes()) ) );
         }//end if
             
         //ignore the message, it doesnt have the flag set
@@ -218,8 +218,14 @@
         
     }
     
-    
     protected boolean memberAlive(Member mbr) {
+        return memberAlive(mbr,TCP_FAIL_DETECT,performSendTest,performReadTest,readTestTimeout,connectTimeout,getOptionFlag());
+    }
+    
+    protected static boolean memberAlive(Member mbr, byte[] msgData, 
+                                         boolean sendTest, boolean readTest,
+                                         long readTimeout, long conTimeout,
+                                         int optionFlag) {
         //could be a shutdown notification
         if ( Arrays.equals(mbr.getPayload(),Member.SHUTDOWN_PAYLOAD) ) return false;
         
@@ -227,20 +233,20 @@
         try {
             InetAddress ia = InetAddress.getByAddress(mbr.getHost());
             InetSocketAddress addr = new InetSocketAddress(ia, mbr.getPort());
-            socket.setSoTimeout((int)readTestTimeout);
-            socket.connect(addr, (int) connectTimeout);
-            if ( performSendTest ) {
+            socket.setSoTimeout((int)readTimeout);
+            socket.connect(addr, (int) conTimeout);
+            if ( sendTest ) {
                 ChannelData data = new ChannelData(true);
                 data.setAddress(mbr);
-                data.setMessage(new XByteBuffer(testMessage,false));
+                data.setMessage(new XByteBuffer(msgData,false));
                 data.setTimestamp(System.currentTimeMillis());
-                int options = getOptionFlag() | Channel.SEND_OPTIONS_BYTE_MESSAGE;
-                if ( performReadTest ) options = (options | Channel.SEND_OPTIONS_USE_ACK);
+                int options = optionFlag | Channel.SEND_OPTIONS_BYTE_MESSAGE;
+                if ( readTest ) options = (options | Channel.SEND_OPTIONS_USE_ACK);
                 else options = (options & (~Channel.SEND_OPTIONS_USE_ACK));
                 data.setOptions(options);
                 byte[] message = XByteBuffer.createDataPackage(data);
                 socket.getOutputStream().write(message);
-                if ( performReadTest ) {
+                if ( readTest ) {
                     int length = socket.getInputStream().read(message);
                     return length > 0;
                 }

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java?rev=409000&r1=408999&r2=409000&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java Tue May 23 13:23:49 2006
@@ -101,6 +101,17 @@
         for (int i=0; i<m.length; i++ ) mbrship.addMember((MemberImpl)m[i]);
     }
     
+    public static Member[] diff(Membership complete, Membership local, MemberImpl ignore) {
+        ArrayList result = new ArrayList();
+        MemberImpl[] comp = complete.getMembers();
+        for ( int i=0; i<comp.length; i++ ) {
+            if ( ignore!=null && ignore.equals(comp[i]) ) continue;
+            if ( local.getMember(comp[i]) == null ) result.add(comp[i]);
+        }
+        return (MemberImpl[])result.toArray(new MemberImpl[result.size()]);
+    }
+
+    
     
     
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org