You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2006/10/24 04:45:59 UTC

svn commit: r467206 [7/30] - in /tomcat: build/tc5.5.x/ connectors/trunk/ connectors/trunk/ajp/ajplib/test/ connectors/trunk/ajp/proxy/ connectors/trunk/jk/jkstatus/src/share/org/apache/jk/status/ connectors/trunk/jk/native/iis/ connectors/trunk/jk/nat...

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java?view=diff&rev=467206&r1=467205&r2=467206
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java Mon Oct 23 19:45:46 2006
@@ -1,840 +1,840 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- */
-package org.apache.catalina.tribes.group.interceptors;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.catalina.tribes.Channel;
-import org.apache.catalina.tribes.ChannelException;
-import org.apache.catalina.tribes.ChannelInterceptor;
-import org.apache.catalina.tribes.ChannelInterceptor.InterceptorEvent;
-import org.apache.catalina.tribes.ChannelMessage;
-import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.UniqueId;
-import org.apache.catalina.tribes.group.AbsoluteOrder;
-import org.apache.catalina.tribes.group.ChannelInterceptorBase;
-import org.apache.catalina.tribes.group.InterceptorPayload;
-import org.apache.catalina.tribes.io.ChannelData;
-import org.apache.catalina.tribes.io.XByteBuffer;
-import org.apache.catalina.tribes.membership.MemberImpl;
-import org.apache.catalina.tribes.membership.Membership;
-import org.apache.catalina.tribes.util.Arrays;
-import org.apache.catalina.tribes.util.UUIDGenerator;
-
-/**
- * <p>Title: Auto merging leader election algorithm</p>
- *
- * <p>Description: Implementation of a simple coordinator algorithm that not only selects a coordinator,
- *    it also merges groups automatically when members are discovered that werent part of the 
- *    </p>
- * <p>This algorithm is non blocking meaning it allows for transactions while the coordination phase is going on
- * </p>
- * <p>This implementation is based on a home brewed algorithm that uses the AbsoluteOrder of a membership
- * to pass a token ring of the current membership.<br>
- * This is not the same as just using AbsoluteOrder! Consider the following scenario:<br>
- * Nodes, A,B,C,D,E on a network, in that priority. AbsoluteOrder will only work if all
- * nodes are receiving pings from all the other nodes. 
- * meaning, that node{i} receives pings from node{all}-node{i}<br>
- * but the following could happen if a multicast problem occurs.
- * A has members {B,C,D}<br>
- * B has members {A,C}<br>
- * C has members {D,E}<br>
- * D has members {A,B,C,E}<br>
- * E has members {A,C,D}<br>
- * Because the default Tribes membership implementation, relies on the multicast packets to 
- * arrive at all nodes correctly, there is nothing guaranteeing that it will.<br>
- * <br>
- * To best explain how this algorithm works, lets take the above example:
- * For simplicity we assume that a send operation is O(1) for all nodes, although this algorithm will work
- * where messages overlap, as they all depend on absolute order<br>
- * Scenario 1: A,B,C,D,E all come online at the same time
- * Eval phase, A thinks of itself as leader, B thinks of A as leader,
- * C thinks of itself as leader, D,E think of A as leader<br>
- * Token phase:<br>
- * (1) A sends out a message X{A-ldr, A-src, mbrs-A,B,C,D} to B where X is the id for the message(and the view)<br>
- * (1) C sends out a message Y{C-ldr, C-src, mbrs-C,D,E} to D where Y is the id for the message(and the view)<br>
- * (2) B receives X{A-ldr, A-src, mbrs-A,B,C,D}, sends X{A-ldr, A-src, mbrs-A,B,C,D} to C <br>
- * (2) D receives Y{C-ldr, C-src, mbrs-C,D,E} D is aware of A,B, sends Y{A-ldr, C-src, mbrs-A,B,C,D,E} to E<br>
- * (3) C receives X{A-ldr, A-src, mbrs-A,B,C,D}, sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to D<br>
- * (3) E receives Y{A-ldr, C-src, mbrs-A,B,C,D,E} sends Y{A-ldr, C-src, mbrs-A,B,C,D,E} to A<br>
- * (4) D receives X{A-ldr, A-src, mbrs-A,B,C,D,E} sends sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to A<br>
- * (4) A receives Y{A-ldr, C-src, mbrs-A,B,C,D,E}, holds the message, add E to its list of members<br>
- * (5) A receives X{A-ldr, A-src, mbrs-A,B,C,D,E} <br>
- * At this point, the state looks like<br>
- * A - {A-ldr, mbrs-A,B,C,D,E, id=X}<br>
- * B - {A-ldr, mbrs-A,B,C,D, id=X}<br>
- * C - {A-ldr, mbrs-A,B,C,D,E, id=X}<br>
- * D - {A-ldr, mbrs-A,B,C,D,E, id=X}<br>
- * E - {A-ldr, mbrs-A,B,C,D,E, id=Y}<br>
- * <br>
- * A message doesn't stop until it reaches its original sender, unless its dropped by a higher leader.
- * As you can see, E still thinks the viewId=Y, which is not correct. But at this point we have 
- * arrived at the same membership and all nodes are informed of each other.<br>
- * To synchronize the rest we simply perform the following check at A when A receives X:<br>
- * Original X{A-ldr, A-src, mbrs-A,B,C,D} == Arrived X{A-ldr, A-src, mbrs-A,B,C,D,E}<br>
- * Since the condition is false, A, will resend the token, and A sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to B
- * When A receives X again, the token is complete. <br>
- * Optionally, A can send a message X{A-ldr, A-src, mbrs-A,B,C,D,E confirmed} to A,B,C,D,E who then
- * install and accept the view.
- * </p>
- * <p>
- * Lets assume that C1 arrives, C1 has lower priority than C, but higher priority than D.<br>
- * Lets also assume that C1 sees the following view {B,D,E}<br>
- * C1 waits for a token to arrive. When the token arrives, the same scenario as above will happen.<br>
- * In the scenario where C1 sees {D,E} and A,B,C can not see C1, no token will ever arrive.<br>
- * In this case, C1 sends a Z{C1-ldr, C1-src, mbrs-C1,D,E} to D<br>
- * D receives Z{C1-ldr, C1-src, mbrs-C1,D,E} and sends Z{A-ldr, C1-src, mbrs-A,B,C,C1,D,E} to E<br>
- * E receives Z{A-ldr, C1-src, mbrs-A,B,C,C1,D,E} and sends it to A<br>
- * A sends Z{A-ldr, A-src, mbrs-A,B,C,C1,D,E} to B and the chain continues until A receives the token again.
- * At that time A optionally sends out Z{A-ldr, A-src, mbrs-A,B,C,C1,D,E, confirmed} to A,B,C,C1,D,E
- * </p>
- * <p>To ensure that the view gets implemented at all nodes at the same time, 
- *    A will send out a VIEW_CONF message, this is the 'confirmed' message that is optional above.
- * <p>Ideally, the interceptor below this one would be the TcpFailureDetector to ensure correct memberships</p>
- *
- * <p>The example above, of course can be simplified with a finite statemachine:<br>
- * But I suck at writing state machines, my head gets all confused. One day I will document this algorithm though.<br>
- * Maybe I'll do a state diagram :)
- * </p>
- * <h2>State Diagrams</h2>
- * <a href="http://people.apache.org/~fhanik/tribes/docs/leader-election-initiate-election.jpg">Initiate an election</a><br><br>
- * <a href="http://people.apache.org/~fhanik/tribes/docs/leader-election-message-arrives.jpg">Receive an election message</a><br><br>
- * 
- * @author Filip Hanik
- * @version 1.0
- * 
- * 
- * 
- */
-public class NonBlockingCoordinator extends ChannelInterceptorBase {
-    
-    /**
-     * header for a coordination message
-     */
-    protected static final byte[] COORD_HEADER = new byte[] {-86, 38, -34, -29, -98, 90, 65, 63, -81, -122, -6, -110, 99, -54, 13, 63};
-    /**
-     * Coordination request
-     */
-    protected static final byte[] COORD_REQUEST = new byte[] {104, -95, -92, -42, 114, -36, 71, -19, -79, 20, 122, 101, -1, -48, -49, 30};
-    /**
-     * Coordination confirmation, for blocking installations
-     */
-    protected static final byte[] COORD_CONF = new byte[] {67, 88, 107, -86, 69, 23, 76, -70, -91, -23, -87, -25, -125, 86, 75, 20};
-    
-    /**
-     * Alive message
-     */
-    protected static final byte[] COORD_ALIVE = new byte[] {79, -121, -25, -15, -59, 5, 64, 94, -77, 113, -119, -88, 52, 114, -56, -46,
-                                                            -18, 102, 10, 34, -127, -9, 71, 115, -70, 72, -101, 88, 72, -124, 127, 111,
-                                                            74, 76, -116, 50, 111, 103, 65, 3, -77, 51, -35, 0, 119, 117, 9, -26,
-                                                            119, 50, -75, -105, -102, 36, 79, 37, -68, -84, -123, 15, -22, -109, 106, -55};
-    /**
-     * Time to wait for coordination timeout
-     */
-    protected long waitForCoordMsgTimeout = 15000;
-    /**
-     * Our current view
-     */
-    protected Membership view = null;
-    /**
-     * Out current viewId
-     */
-    protected UniqueId viewId;
-
-    /**
-     * Our nonblocking membership
-     */
-    protected Membership membership = null;
-    
-    /**
-     * indicates that we are running an election 
-     * and this is the one we are running
-     */
-    protected UniqueId suggestedviewId;
-    protected Membership suggestedView;
-    
-    protected boolean started = false;
-    protected final int startsvc = 0xFFFF;
-    
-    protected Object electionMutex = new Object();
-    
-    protected AtomicBoolean coordMsgReceived = new AtomicBoolean(false);
-    
-    public NonBlockingCoordinator() {
-        super();
-    }
-    
-//============================================================================================================    
-//              COORDINATION HANDLING
-//============================================================================================================
-    
-    public void startElection(boolean force) throws ChannelException {
-        synchronized (electionMutex) {
-            MemberImpl local = (MemberImpl)getLocalMember(false);
-            MemberImpl[] others = (MemberImpl[])membership.getMembers();
-            fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START_ELECT,this,"Election initated"));
-            if ( others.length == 0 ) {
-                this.viewId = new UniqueId(UUIDGenerator.randomUUID(false));
-                this.view = new Membership(local,AbsoluteOrder.comp, true);
-                this.handleViewConf(this.createElectionMsg(local,others,local),local,view);
-                return; //the only member, no need for an election
-            }
-            if ( suggestedviewId != null ) {
-                
-                if ( view != null && Arrays.diff(view,suggestedView,local).length == 0 &&  Arrays.diff(suggestedView,view,local).length == 0) {
-                    suggestedviewId = null;
-                    suggestedView = null;
-                    fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, running election matches view"));
-                } else {
-                    fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, election running"));
-                }
-                return; //election already running, I'm not allowed to have two of them
-            }
-            if ( view != null && Arrays.diff(view,membership,local).length == 0 &&  Arrays.diff(membership,view,local).length == 0) {
-                fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, view matches membership"));
-                return; //already have this view installed
-            }            
-            int prio = AbsoluteOrder.comp.compare(local,others[0]);
-            MemberImpl leader = ( prio < 0 )?local:others[0];//am I the leader in my view?
-            if ( local.equals(leader) || force ) {
-                CoordinationMessage msg = createElectionMsg(local, others, leader);
-                suggestedviewId = msg.getId();
-                suggestedView = new Membership(local,AbsoluteOrder.comp,true);
-                Arrays.fill(suggestedView,msg.getMembers());
-                fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_PROCESS_ELECT,this,"Election, sending request"));
-                sendElectionMsg(local,others[0],msg);
-            } else {
-                try {
-                    coordMsgReceived.set(false);
-                    fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_WAIT_FOR_MSG,this,"Election, waiting for request"));
-                    electionMutex.wait(waitForCoordMsgTimeout);
-                }catch ( InterruptedException x ) {
-                    Thread.currentThread().interrupted();
-                }
-                if ( suggestedviewId == null && (!coordMsgReceived.get())) {
-                    //no message arrived, send the coord msg
-//                    fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_WAIT_FOR_MSG,this,"Election, waiting timed out."));
-//                    startElection(true);
-                    fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, waiting timed out."));
-                } else {
-                    fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, received a message"));
-                }
-            }//end if
-            
-        }
-    }
-
-    private CoordinationMessage createElectionMsg(MemberImpl local, MemberImpl[] others, MemberImpl leader) {
-        Membership m = new Membership(local,AbsoluteOrder.comp,true);
-        Arrays.fill(m,others);
-        MemberImpl[] mbrs = m.getMembers();
-        m.reset(); 
-        CoordinationMessage msg = new CoordinationMessage(leader, local, mbrs,new UniqueId(UUIDGenerator.randomUUID(true)), this.COORD_REQUEST);
-        return msg;
-    }
-
-    protected void sendElectionMsg(MemberImpl local, MemberImpl next, CoordinationMessage msg) throws ChannelException {
-        fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_SEND_MSG,this,"Sending election message to("+next.getName()+")"));
-        super.sendMessage(new Member[] {next}, createData(msg, local), null);
-    }
-    
-    protected void sendElectionMsgToNextInline(MemberImpl local, CoordinationMessage msg) throws ChannelException { 
-        int next = Arrays.nextIndex(local,msg.getMembers());
-        int current = next;
-        msg.leader = msg.getMembers()[0];
-        boolean sent =  false;
-        while ( !sent && current >= 0 ) {
-            try {
-                sendElectionMsg(local, (MemberImpl) msg.getMembers()[current], msg);
-                sent = true;
-            }catch ( ChannelException x  ) {
-                log.warn("Unable to send election message to:"+msg.getMembers()[current]);
-                current = Arrays.nextIndex(msg.getMembers()[current],msg.getMembers());
-                if ( current == next ) throw x;
-            }
-        }
-    }
-    
-    public Member getNextInLine(MemberImpl local, MemberImpl[] others) {
-        MemberImpl result = null;
-        for ( int i=0; i<others.length; i++ ) {
-            
-        }
-        return result;
-    }
-    
-    public ChannelData createData(CoordinationMessage msg, MemberImpl local) {
-        msg.write();
-        ChannelData data = new ChannelData(true);
-        data.setAddress(local);
-        data.setMessage(msg.getBuffer());
-        data.setOptions(Channel.SEND_OPTIONS_USE_ACK);
-        data.setTimestamp(System.currentTimeMillis());
-        return data;
-    }
-    
-    protected void viewChange(UniqueId viewId, Member[] view) {
-        //invoke any listeners
-    }
-    
-    protected boolean alive(Member mbr) {
-        return TcpFailureDetector.memberAlive(mbr,
-                                              COORD_ALIVE,
-                                              false,
-                                              false,
-                                              waitForCoordMsgTimeout,
-                                              waitForCoordMsgTimeout,
-                                              getOptionFlag());
-    }
-    
-    protected Membership mergeOnArrive(CoordinationMessage msg, Member sender) {
-        fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_PRE_MERGE,this,"Pre merge"));
-        MemberImpl local = (MemberImpl)getLocalMember(false);
-        Membership merged = new Membership(local,AbsoluteOrder.comp,true);
-        Arrays.fill(merged,msg.getMembers());
-        Arrays.fill(merged,getMembers());
-        Member[] diff = Arrays.diff(merged,membership,local);
-        for ( int i=0; i<diff.length; i++ ) {
-            if (!alive(diff[i])) merged.removeMember((MemberImpl)diff[i]);
-            else memberAdded(diff[i],false);
-        }
-        fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_POST_MERGE,this,"Post merge"));
-        return merged;
-    }
-    
-    protected void processCoordMessage(CoordinationMessage msg, Member sender) throws ChannelException {
-        if ( !coordMsgReceived.get() ) {
-            coordMsgReceived.set(true);
-            synchronized (electionMutex) { electionMutex.notifyAll();}
-        } 
-        msg.timestamp = System.currentTimeMillis();
-        Membership merged = mergeOnArrive(msg, sender);
-        if (isViewConf(msg)) handleViewConf(msg, sender, merged);
-        else handleToken(msg, sender, merged);
-        ClassLoader loader;
-
-    }
-    
-    protected void handleToken(CoordinationMessage msg, Member sender,Membership merged) throws ChannelException {
-        MemberImpl local = (MemberImpl)getLocalMember(false);
-        if ( local.equals(msg.getSource()) ) {
-            //my message msg.src=local
-            handleMyToken(local, msg, sender,merged);
-        } else {
-            handleOtherToken(local, msg, sender,merged);
-        }
-    }
-    
-    protected void handleMyToken(MemberImpl local, CoordinationMessage msg, Member sender,Membership merged) throws ChannelException {
-        if ( local.equals(msg.getLeader()) ) {
-            //no leadership change
-            if ( Arrays.sameMembers(msg.getMembers(),merged.getMembers()) ) {
-                msg.type = COORD_CONF;
-                super.sendMessage(Arrays.remove(msg.getMembers(),local),createData(msg,local),null);
-                handleViewConf(msg,local,merged);
-            } else {
-                //membership change
-                suggestedView = new Membership(local,AbsoluteOrder.comp,true);
-                suggestedviewId = msg.getId();
-                Arrays.fill(suggestedView,merged.getMembers());
-                msg.view = (MemberImpl[])merged.getMembers();
-                sendElectionMsgToNextInline(local,msg);
-            }
-        } else {
-            //leadership change
-            suggestedView = null;
-            suggestedviewId = null;
-            msg.view = (MemberImpl[])merged.getMembers();
-            sendElectionMsgToNextInline(local,msg);
-        }
-    }
-    
-    protected void handleOtherToken(MemberImpl local, CoordinationMessage msg, Member sender,Membership merged) throws ChannelException {
-        if ( local.equals(msg.getLeader()) ) {
-            //I am the new leader
-            //startElection(false);
-        } else {
-            msg.view = (MemberImpl[])merged.getMembers();
-            sendElectionMsgToNextInline(local,msg);
-        }
-    }
-    
-    protected void handleViewConf(CoordinationMessage msg, Member sender,Membership merged) throws ChannelException {
-        if ( viewId != null && msg.getId().equals(viewId) ) return;//we already have this view
-        view = new Membership((MemberImpl)getLocalMember(false),AbsoluteOrder.comp,true);
-        Arrays.fill(view,msg.getMembers());
-        viewId = msg.getId();
-        
-        if ( viewId.equals(suggestedviewId) ) {
-            suggestedView = null;
-            suggestedviewId = null;
-        }
-        
-        if (suggestedView != null && AbsoluteOrder.comp.compare(suggestedView.getMembers()[0],merged.getMembers()[0])<0 ) {
-            suggestedView = null;
-            suggestedviewId = null;
-        }
-        
-        viewChange(viewId,view.getMembers());
-        fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_CONF_RX,this,"Accepted View"));
-        
-        if ( suggestedviewId == null && hasHigherPriority(merged.getMembers(),membership.getMembers()) ) {
-            startElection(false);
-        }
-    }
-    
-    protected boolean isViewConf(CoordinationMessage msg) {
-        return Arrays.contains(msg.getType(),0,COORD_CONF,0,COORD_CONF.length);
-    }
-    
-    protected boolean hasHigherPriority(Member[] complete, Member[] local) {
-        if ( local == null || local.length == 0 ) return false;
-        if ( complete == null || complete.length == 0 ) return true;
-        AbsoluteOrder.absoluteOrder(complete);
-        AbsoluteOrder.absoluteOrder(local);
-        return (AbsoluteOrder.comp.compare(complete[0],local[0]) > 0);
-        
-    }
-
-    
-    /**
-     * Returns coordinator if one is available
-     * @return Member
-     */
-    public Member getCoordinator() {
-        return (view != null && view.hasMembers()) ? view.getMembers()[0] : null;
-    }
-    
-    public Member[] getView() {
-        return (view != null && view.hasMembers()) ? view.getMembers() : new Member[0];
-    }
-    
-    public UniqueId getViewId() {
-        return viewId;
-    }
-    
-    /**
-    * Block in/out messages while a election is going on
-    */
-   protected void halt() {
-
-   }
-
-   /**
-    * Release lock for in/out messages election is completed
-    */
-   protected void release() {
-
-   }
-
-   /**
-    * Wait for an election to end
-    */
-   protected void waitForRelease() {
-
-   }
-
-    
-//============================================================================================================    
-//              OVERRIDDEN METHODS FROM CHANNEL INTERCEPTOR BASE    
-//============================================================================================================
-    public void start(int svc) throws ChannelException {
-            if (membership == null) setupMembership();
-            if (started)return;
-            fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START, this, "Before start"));
-            super.start(startsvc);
-            started = true;
-            if (view == null) view = new Membership( (MemberImpl)super.getLocalMember(true), AbsoluteOrder.comp, true);
-            fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START, this, "After start"));
-            startElection(false);
-    }
-
-    public void stop(int svc) throws ChannelException {
-        try {
-            halt();
-            synchronized (electionMutex) {
-                if (!started)return;
-                started = false;
-                fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_STOP, this, "Before stop"));
-                super.stop(startsvc);
-                this.view = null;
-                this.viewId = null;
-                this.suggestedView = null;
-                this.suggestedviewId = null;
-                this.membership.reset();
-                fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_STOP, this, "After stop"));
-            }
-        }finally {
-            release();
-        }
-    }
-    
-    
-    public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
-        waitForRelease();
-        super.sendMessage(destination, msg, payload);
-    }
-
-    public void messageReceived(ChannelMessage msg) {
-        if ( Arrays.contains(msg.getMessage().getBytesDirect(),0,COORD_ALIVE,0,COORD_ALIVE.length) ) {
-            //ignore message, its an alive message
-            fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MSG_ARRIVE,this,"Alive Message"));
-
-        } else if ( Arrays.contains(msg.getMessage().getBytesDirect(),0,COORD_HEADER,0,COORD_HEADER.length) ) {
-            try {
-                CoordinationMessage cmsg = new CoordinationMessage(msg.getMessage());
-                Member[] cmbr = cmsg.getMembers();
-                fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MSG_ARRIVE,this,"Coord Msg Arrived("+Arrays.toNameString(cmbr)+")"));
-                processCoordMessage(cmsg, msg.getAddress());
-            }catch ( ChannelException x ) {
-                log.error("Error processing coordination message. Could be fatal.",x);
-            }
-        } else {
-            super.messageReceived(msg);
-        }
-    }
-
-    public boolean accept(ChannelMessage msg) {
-        return super.accept(msg);
-    }
-
-    public void memberAdded(Member member) {
-        memberAdded(member,true);
-    }
-
-    public void memberAdded(Member member,boolean elect) {
-        try {
-            if ( membership == null ) setupMembership();
-            if ( membership.memberAlive((MemberImpl)member) ) super.memberAdded(member);
-            try {
-                fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MBR_ADD,this,"Member add("+member.getName()+")"));
-                if (started && elect) startElection(false);
-            }catch ( ChannelException x ) {
-                log.error("Unable to start election when member was added.",x);
-            }
-        }finally {
-        }
-        
-    }
-
-    public void memberDisappeared(Member member) {
-        try {
-            
-            membership.removeMember((MemberImpl)member);
-            super.memberDisappeared(member);
-            try {
-                fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MBR_DEL,this,"Member remove("+member.getName()+")"));
-                if ( started && (isCoordinator() || isHighest()) ) 
-                    startElection(true); //to do, if a member disappears, only the coordinator can start
-            }catch ( ChannelException x ) {
-                log.error("Unable to start election when member was removed.",x);
-            }
-        }finally {
-        }
-    }
-    
-    public boolean isHighest() {
-        Member local = getLocalMember(false);
-        if ( membership.getMembers().length == 0 ) return true;
-        else return AbsoluteOrder.comp.compare(local,membership.getMembers()[0])<=0;
-    }
-    
-    public boolean isCoordinator() {
-        Member coord = getCoordinator();
-        return coord != null && getLocalMember(false).equals(coord);
-    }
-
-    public void heartbeat() {
-        try {
-            MemberImpl local = (MemberImpl)getLocalMember(false);
-            if ( view != null && (Arrays.diff(view,membership,local).length != 0 ||  Arrays.diff(membership,view,local).length != 0) ) {
-                if ( isHighest() ) {
-                    fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START_ELECT, this,
-                                                               "Heartbeat found inconsistency, restart election"));
-                    startElection(true);
-                }            
-            }
-        } catch ( Exception x  ){
-            log.error("Unable to perform heartbeat.",x);
-        } finally {
-            super.heartbeat();
-        }
-    }
-
-    /**
-     * has members
-     */
-    public boolean hasMembers() {
-        
-        return membership.hasMembers();
-    }
-
-    /**
-     * Get all current cluster members
-     * @return all members or empty array
-     */
-    public Member[] getMembers() {
-        
-        return membership.getMembers();
-    }
-
-    /**
-     *
-     * @param mbr Member
-     * @return Member
-     */
-    public Member getMember(Member mbr) {
-        
-        return membership.getMember(mbr);
-    }
-
-    /**
-     * Return the member that represents this node.
-     *
-     * @return Member
-     */
-    public Member getLocalMember(boolean incAlive) {
-        Member local = super.getLocalMember(incAlive);
-        if ( view == null && (local != null)) setupMembership();
-        return local;
-    }
-    
-    protected synchronized void setupMembership() {
-        if ( membership == null ) {
-            membership  = new Membership((MemberImpl)super.getLocalMember(true),AbsoluteOrder.comp,false);
-        }
-    }
-    
-    
-//============================================================================================================    
-//              HELPER CLASSES FOR COORDINATION
-//============================================================================================================
-    
-    
-   
-    
-    public static class CoordinationMessage {
-        //X{A-ldr, A-src, mbrs-A,B,C,D}
-        protected XByteBuffer buf;
-        protected MemberImpl leader;
-        protected MemberImpl source;
-        protected MemberImpl[] view;
-        protected UniqueId id;
-        protected byte[] type;
-        protected long timestamp = System.currentTimeMillis();
-        
-        public CoordinationMessage(XByteBuffer buf) {
-            this.buf = buf;
-            parse();
-        }
-
-        public CoordinationMessage(MemberImpl leader,
-                                   MemberImpl source, 
-                                   MemberImpl[] view,
-                                   UniqueId id,
-                                   byte[] type) {
-            this.buf = new XByteBuffer(4096,false);
-            this.leader = leader;
-            this.source = source;
-            this.view = view;
-            this.id = id;
-            this.type = type;
-            this.write();
-        }
-        
-
-        public byte[] getHeader() {
-            return NonBlockingCoordinator.COORD_HEADER;
-        }
-        
-        public MemberImpl getLeader() {
-            if ( leader == null ) parse();
-            return leader;
-        }
-        
-        public MemberImpl getSource() {
-            if ( source == null ) parse();
-            return source;
-        }
-        
-        public UniqueId getId() {
-            if ( id == null ) parse();
-            return id;
-        }
-        
-        public MemberImpl[] getMembers() {
-            if ( view == null ) parse();
-            return view;
-        }
-        
-        public byte[] getType() {
-            if (type == null ) parse();
-            return type;
-        }
-        
-        public XByteBuffer getBuffer() {
-            return this.buf;
-        }
-        
-        public void parse() {
-            //header
-            int offset = 16;
-            //leader
-            int ldrLen = buf.toInt(buf.getBytesDirect(),offset);
-            offset += 4;
-            byte[] ldr = new byte[ldrLen];
-            System.arraycopy(buf.getBytesDirect(),offset,ldr,0,ldrLen);
-            leader = MemberImpl.getMember(ldr);
-            offset += ldrLen;
-            //source
-            int srcLen = buf.toInt(buf.getBytesDirect(),offset);
-            offset += 4;
-            byte[] src = new byte[srcLen];
-            System.arraycopy(buf.getBytesDirect(),offset,src,0,srcLen);
-            source = MemberImpl.getMember(src);
-            offset += srcLen;
-            //view
-            int mbrCount = buf.toInt(buf.getBytesDirect(),offset);
-            offset += 4;
-            view = new MemberImpl[mbrCount];
-            for (int i=0; i<view.length; i++ ) {
-                int mbrLen = buf.toInt(buf.getBytesDirect(),offset);
-                offset += 4;
-                byte[] mbr = new byte[mbrLen];
-                System.arraycopy(buf.getBytesDirect(), offset, mbr, 0, mbrLen);
-                view[i] = MemberImpl.getMember(mbr);
-                offset += mbrLen;
-            }
-            //id
-            this.id = new UniqueId(buf.getBytesDirect(),offset,16);
-            offset += 16;
-            type = new byte[16];
-            System.arraycopy(buf.getBytesDirect(), offset, type, 0, type.length);
-            offset += 16;
-            
-        }
-        
-        public void write() {
-            buf.reset();
-            //header
-            buf.append(COORD_HEADER,0,COORD_HEADER.length);
-            //leader
-            byte[] ldr = leader.getData(false,false);
-            buf.append(ldr.length);
-            buf.append(ldr,0,ldr.length);
-            ldr = null;
-            //source
-            byte[] src = source.getData(false,false);
-            buf.append(src.length);
-            buf.append(src,0,src.length);
-            src = null;
-            //view
-            buf.append(view.length);
-            for (int i=0; i<view.length; i++ ) {
-                byte[] mbr = view[i].getData(false,false);
-                buf.append(mbr.length);
-                buf.append(mbr,0,mbr.length);
-            }
-            //id
-            buf.append(id.getBytes(),0,id.getBytes().length);
-            buf.append(type,0,type.length);
-        }
-    }
-    
-    public void fireInterceptorEvent(InterceptorEvent event) {
-        if (event instanceof CoordinationEvent &&
-            ((CoordinationEvent)event).type == CoordinationEvent.EVT_CONF_RX) 
-            log.info(event);
-    }
-    
-    public static class CoordinationEvent implements InterceptorEvent {
-        public static final int EVT_START = 1;
-        public static final int EVT_MBR_ADD = 2;
-        public static final int EVT_MBR_DEL = 3;
-        public static final int EVT_START_ELECT = 4;
-        public static final int EVT_PROCESS_ELECT = 5;
-        public static final int EVT_MSG_ARRIVE = 6;
-        public static final int EVT_PRE_MERGE = 7;
-        public static final int EVT_POST_MERGE = 8;
-        public static final int EVT_WAIT_FOR_MSG = 9;
-        public static final int EVT_SEND_MSG = 10;
-        public static final int EVT_STOP = 11;
-        public static final int EVT_CONF_RX = 12;
-        public static final int EVT_ELECT_ABANDONED = 13;
-        
-        int type;
-        ChannelInterceptor interceptor;
-        Member coord; 
-        Member[] mbrs;
-        String info;
-        Membership view;
-        Membership suggestedView;
-        public CoordinationEvent(int type,ChannelInterceptor interceptor, String info) {
-            this.type = type;
-            this.interceptor = interceptor;
-            this.coord = ((NonBlockingCoordinator)interceptor).getCoordinator();
-            this.mbrs = ((NonBlockingCoordinator)interceptor).membership.getMembers();
-            this.info = info;
-            this.view = ((NonBlockingCoordinator)interceptor).view;
-            this.suggestedView = ((NonBlockingCoordinator)interceptor).suggestedView;
-        }
-        
-        public int getEventType() {
-            return type;
-        }
-        
-        public String getEventTypeDesc() {
-            switch (type) {
-                case  EVT_START: return "EVT_START:"+info;
-                case  EVT_MBR_ADD: return "EVT_MBR_ADD:"+info;
-                case  EVT_MBR_DEL: return "EVT_MBR_DEL:"+info;
-                case  EVT_START_ELECT: return "EVT_START_ELECT:"+info;
-                case  EVT_PROCESS_ELECT: return "EVT_PROCESS_ELECT:"+info;
-                case  EVT_MSG_ARRIVE: return "EVT_MSG_ARRIVE:"+info;
-                case  EVT_PRE_MERGE: return "EVT_PRE_MERGE:"+info;
-                case  EVT_POST_MERGE: return "EVT_POST_MERGE:"+info;
-                case  EVT_WAIT_FOR_MSG: return "EVT_WAIT_FOR_MSG:"+info;
-                case  EVT_SEND_MSG: return "EVT_SEND_MSG:"+info;
-                case  EVT_STOP: return "EVT_STOP:"+info;
-                case  EVT_CONF_RX: return "EVT_CONF_RX:"+info;
-                case EVT_ELECT_ABANDONED: return "EVT_ELECT_ABANDONED:"+info;
-                default: return "Unknown";
-            }
-        }
-        
-        public ChannelInterceptor getInterceptor() {
-            return interceptor;
-        }
-        
-        public String toString() {
-            StringBuffer buf = new StringBuffer("CoordinationEvent[type=");
-            buf.append(type).append("\n\tLocal:");
-            Member local = interceptor.getLocalMember(false);
-            buf.append(local!=null?local.getName():"").append("\n\tCoord:");
-            buf.append(coord!=null?coord.getName():"").append("\n\tView:");
-            buf.append(Arrays.toNameString(view!=null?view.getMembers():null)).append("\n\tSuggested View:");
-            buf.append(Arrays.toNameString(suggestedView!=null?suggestedView.getMembers():null)).append("\n\tMembers:");
-            buf.append(Arrays.toNameString(mbrs)).append("\n\tInfo:");
-            buf.append(info).append("]");
-            return buf.toString();
-        }
-    }
-
-    
-
-
-
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ */
+package org.apache.catalina.tribes.group.interceptors;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelInterceptor;
+import org.apache.catalina.tribes.ChannelInterceptor.InterceptorEvent;
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.UniqueId;
+import org.apache.catalina.tribes.group.AbsoluteOrder;
+import org.apache.catalina.tribes.group.ChannelInterceptorBase;
+import org.apache.catalina.tribes.group.InterceptorPayload;
+import org.apache.catalina.tribes.io.ChannelData;
+import org.apache.catalina.tribes.io.XByteBuffer;
+import org.apache.catalina.tribes.membership.MemberImpl;
+import org.apache.catalina.tribes.membership.Membership;
+import org.apache.catalina.tribes.util.Arrays;
+import org.apache.catalina.tribes.util.UUIDGenerator;
+
+/**
+ * <p>Title: Auto merging leader election algorithm</p>
+ *
+ * <p>Description: Implementation of a simple coordinator algorithm that not only selects a coordinator,
+ *    it also merges groups automatically when members are discovered that werent part of the 
+ *    </p>
+ * <p>This algorithm is non blocking meaning it allows for transactions while the coordination phase is going on
+ * </p>
+ * <p>This implementation is based on a home brewed algorithm that uses the AbsoluteOrder of a membership
+ * to pass a token ring of the current membership.<br>
+ * This is not the same as just using AbsoluteOrder! Consider the following scenario:<br>
+ * Nodes, A,B,C,D,E on a network, in that priority. AbsoluteOrder will only work if all
+ * nodes are receiving pings from all the other nodes. 
+ * meaning, that node{i} receives pings from node{all}-node{i}<br>
+ * but the following could happen if a multicast problem occurs.
+ * A has members {B,C,D}<br>
+ * B has members {A,C}<br>
+ * C has members {D,E}<br>
+ * D has members {A,B,C,E}<br>
+ * E has members {A,C,D}<br>
+ * Because the default Tribes membership implementation, relies on the multicast packets to 
+ * arrive at all nodes correctly, there is nothing guaranteeing that it will.<br>
+ * <br>
+ * To best explain how this algorithm works, lets take the above example:
+ * For simplicity we assume that a send operation is O(1) for all nodes, although this algorithm will work
+ * where messages overlap, as they all depend on absolute order<br>
+ * Scenario 1: A,B,C,D,E all come online at the same time
+ * Eval phase, A thinks of itself as leader, B thinks of A as leader,
+ * C thinks of itself as leader, D,E think of A as leader<br>
+ * Token phase:<br>
+ * (1) A sends out a message X{A-ldr, A-src, mbrs-A,B,C,D} to B where X is the id for the message(and the view)<br>
+ * (1) C sends out a message Y{C-ldr, C-src, mbrs-C,D,E} to D where Y is the id for the message(and the view)<br>
+ * (2) B receives X{A-ldr, A-src, mbrs-A,B,C,D}, sends X{A-ldr, A-src, mbrs-A,B,C,D} to C <br>
+ * (2) D receives Y{C-ldr, C-src, mbrs-C,D,E} D is aware of A,B, sends Y{A-ldr, C-src, mbrs-A,B,C,D,E} to E<br>
+ * (3) C receives X{A-ldr, A-src, mbrs-A,B,C,D}, sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to D<br>
+ * (3) E receives Y{A-ldr, C-src, mbrs-A,B,C,D,E} sends Y{A-ldr, C-src, mbrs-A,B,C,D,E} to A<br>
+ * (4) D receives X{A-ldr, A-src, mbrs-A,B,C,D,E} sends sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to A<br>
+ * (4) A receives Y{A-ldr, C-src, mbrs-A,B,C,D,E}, holds the message, add E to its list of members<br>
+ * (5) A receives X{A-ldr, A-src, mbrs-A,B,C,D,E} <br>
+ * At this point, the state looks like<br>
+ * A - {A-ldr, mbrs-A,B,C,D,E, id=X}<br>
+ * B - {A-ldr, mbrs-A,B,C,D, id=X}<br>
+ * C - {A-ldr, mbrs-A,B,C,D,E, id=X}<br>
+ * D - {A-ldr, mbrs-A,B,C,D,E, id=X}<br>
+ * E - {A-ldr, mbrs-A,B,C,D,E, id=Y}<br>
+ * <br>
+ * A message doesn't stop until it reaches its original sender, unless its dropped by a higher leader.
+ * As you can see, E still thinks the viewId=Y, which is not correct. But at this point we have 
+ * arrived at the same membership and all nodes are informed of each other.<br>
+ * To synchronize the rest we simply perform the following check at A when A receives X:<br>
+ * Original X{A-ldr, A-src, mbrs-A,B,C,D} == Arrived X{A-ldr, A-src, mbrs-A,B,C,D,E}<br>
+ * Since the condition is false, A, will resend the token, and A sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to B
+ * When A receives X again, the token is complete. <br>
+ * Optionally, A can send a message X{A-ldr, A-src, mbrs-A,B,C,D,E confirmed} to A,B,C,D,E who then
+ * install and accept the view.
+ * </p>
+ * <p>
+ * Lets assume that C1 arrives, C1 has lower priority than C, but higher priority than D.<br>
+ * Lets also assume that C1 sees the following view {B,D,E}<br>
+ * C1 waits for a token to arrive. When the token arrives, the same scenario as above will happen.<br>
+ * In the scenario where C1 sees {D,E} and A,B,C can not see C1, no token will ever arrive.<br>
+ * In this case, C1 sends a Z{C1-ldr, C1-src, mbrs-C1,D,E} to D<br>
+ * D receives Z{C1-ldr, C1-src, mbrs-C1,D,E} and sends Z{A-ldr, C1-src, mbrs-A,B,C,C1,D,E} to E<br>
+ * E receives Z{A-ldr, C1-src, mbrs-A,B,C,C1,D,E} and sends it to A<br>
+ * A sends Z{A-ldr, A-src, mbrs-A,B,C,C1,D,E} to B and the chain continues until A receives the token again.
+ * At that time A optionally sends out Z{A-ldr, A-src, mbrs-A,B,C,C1,D,E, confirmed} to A,B,C,C1,D,E
+ * </p>
+ * <p>To ensure that the view gets implemented at all nodes at the same time, 
+ *    A will send out a VIEW_CONF message, this is the 'confirmed' message that is optional above.
+ * <p>Ideally, the interceptor below this one would be the TcpFailureDetector to ensure correct memberships</p>
+ *
+ * <p>The example above, of course can be simplified with a finite statemachine:<br>
+ * But I suck at writing state machines, my head gets all confused. One day I will document this algorithm though.<br>
+ * Maybe I'll do a state diagram :)
+ * </p>
+ * <h2>State Diagrams</h2>
+ * <a href="http://people.apache.org/~fhanik/tribes/docs/leader-election-initiate-election.jpg">Initiate an election</a><br><br>
+ * <a href="http://people.apache.org/~fhanik/tribes/docs/leader-election-message-arrives.jpg">Receive an election message</a><br><br>
+ * 
+ * @author Filip Hanik
+ * @version 1.0
+ * 
+ * 
+ * 
+ */
+public class NonBlockingCoordinator extends ChannelInterceptorBase {
+    
+    /**
+     * header for a coordination message
+     */
+    protected static final byte[] COORD_HEADER = new byte[] {-86, 38, -34, -29, -98, 90, 65, 63, -81, -122, -6, -110, 99, -54, 13, 63};
+    /**
+     * Coordination request
+     */
+    protected static final byte[] COORD_REQUEST = new byte[] {104, -95, -92, -42, 114, -36, 71, -19, -79, 20, 122, 101, -1, -48, -49, 30};
+    /**
+     * Coordination confirmation, for blocking installations
+     */
+    protected static final byte[] COORD_CONF = new byte[] {67, 88, 107, -86, 69, 23, 76, -70, -91, -23, -87, -25, -125, 86, 75, 20};
+    
+    /**
+     * Alive message
+     */
+    protected static final byte[] COORD_ALIVE = new byte[] {79, -121, -25, -15, -59, 5, 64, 94, -77, 113, -119, -88, 52, 114, -56, -46,
+                                                            -18, 102, 10, 34, -127, -9, 71, 115, -70, 72, -101, 88, 72, -124, 127, 111,
+                                                            74, 76, -116, 50, 111, 103, 65, 3, -77, 51, -35, 0, 119, 117, 9, -26,
+                                                            119, 50, -75, -105, -102, 36, 79, 37, -68, -84, -123, 15, -22, -109, 106, -55};
+    /**
+     * Time to wait for coordination timeout
+     */
+    protected long waitForCoordMsgTimeout = 15000;
+    /**
+     * Our current view
+     */
+    protected Membership view = null;
+    /**
+     * Out current viewId
+     */
+    protected UniqueId viewId;
+
+    /**
+     * Our nonblocking membership
+     */
+    protected Membership membership = null;
+    
+    /**
+     * indicates that we are running an election 
+     * and this is the one we are running
+     */
+    protected UniqueId suggestedviewId;
+    protected Membership suggestedView;
+    
+    protected boolean started = false;
+    protected final int startsvc = 0xFFFF;
+    
+    protected Object electionMutex = new Object();
+    
+    protected AtomicBoolean coordMsgReceived = new AtomicBoolean(false);
+    
+    public NonBlockingCoordinator() {
+        super();
+    }
+    
+//============================================================================================================    
+//              COORDINATION HANDLING
+//============================================================================================================
+    
+    public void startElection(boolean force) throws ChannelException {
+        synchronized (electionMutex) {
+            MemberImpl local = (MemberImpl)getLocalMember(false);
+            MemberImpl[] others = (MemberImpl[])membership.getMembers();
+            fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START_ELECT,this,"Election initated"));
+            if ( others.length == 0 ) {
+                this.viewId = new UniqueId(UUIDGenerator.randomUUID(false));
+                this.view = new Membership(local,AbsoluteOrder.comp, true);
+                this.handleViewConf(this.createElectionMsg(local,others,local),local,view);
+                return; //the only member, no need for an election
+            }
+            if ( suggestedviewId != null ) {
+                
+                if ( view != null && Arrays.diff(view,suggestedView,local).length == 0 &&  Arrays.diff(suggestedView,view,local).length == 0) {
+                    suggestedviewId = null;
+                    suggestedView = null;
+                    fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, running election matches view"));
+                } else {
+                    fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, election running"));
+                }
+                return; //election already running, I'm not allowed to have two of them
+            }
+            if ( view != null && Arrays.diff(view,membership,local).length == 0 &&  Arrays.diff(membership,view,local).length == 0) {
+                fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, view matches membership"));
+                return; //already have this view installed
+            }            
+            int prio = AbsoluteOrder.comp.compare(local,others[0]);
+            MemberImpl leader = ( prio < 0 )?local:others[0];//am I the leader in my view?
+            if ( local.equals(leader) || force ) {
+                CoordinationMessage msg = createElectionMsg(local, others, leader);
+                suggestedviewId = msg.getId();
+                suggestedView = new Membership(local,AbsoluteOrder.comp,true);
+                Arrays.fill(suggestedView,msg.getMembers());
+                fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_PROCESS_ELECT,this,"Election, sending request"));
+                sendElectionMsg(local,others[0],msg);
+            } else {
+                try {
+                    coordMsgReceived.set(false);
+                    fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_WAIT_FOR_MSG,this,"Election, waiting for request"));
+                    electionMutex.wait(waitForCoordMsgTimeout);
+                }catch ( InterruptedException x ) {
+                    Thread.currentThread().interrupted();
+                }
+                if ( suggestedviewId == null && (!coordMsgReceived.get())) {
+                    //no message arrived, send the coord msg
+//                    fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_WAIT_FOR_MSG,this,"Election, waiting timed out."));
+//                    startElection(true);
+                    fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, waiting timed out."));
+                } else {
+                    fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, received a message"));
+                }
+            }//end if
+            
+        }
+    }
+
+    private CoordinationMessage createElectionMsg(MemberImpl local, MemberImpl[] others, MemberImpl leader) {
+        Membership m = new Membership(local,AbsoluteOrder.comp,true);
+        Arrays.fill(m,others);
+        MemberImpl[] mbrs = m.getMembers();
+        m.reset(); 
+        CoordinationMessage msg = new CoordinationMessage(leader, local, mbrs,new UniqueId(UUIDGenerator.randomUUID(true)), this.COORD_REQUEST);
+        return msg;
+    }
+
+    protected void sendElectionMsg(MemberImpl local, MemberImpl next, CoordinationMessage msg) throws ChannelException {
+        fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_SEND_MSG,this,"Sending election message to("+next.getName()+")"));
+        super.sendMessage(new Member[] {next}, createData(msg, local), null);
+    }
+    
+    protected void sendElectionMsgToNextInline(MemberImpl local, CoordinationMessage msg) throws ChannelException { 
+        int next = Arrays.nextIndex(local,msg.getMembers());
+        int current = next;
+        msg.leader = msg.getMembers()[0];
+        boolean sent =  false;
+        while ( !sent && current >= 0 ) {
+            try {
+                sendElectionMsg(local, (MemberImpl) msg.getMembers()[current], msg);
+                sent = true;
+            }catch ( ChannelException x  ) {
+                log.warn("Unable to send election message to:"+msg.getMembers()[current]);
+                current = Arrays.nextIndex(msg.getMembers()[current],msg.getMembers());
+                if ( current == next ) throw x;
+            }
+        }
+    }
+    
+    public Member getNextInLine(MemberImpl local, MemberImpl[] others) {
+        MemberImpl result = null;
+        for ( int i=0; i<others.length; i++ ) {
+            
+        }
+        return result;
+    }
+    
+    public ChannelData createData(CoordinationMessage msg, MemberImpl local) {
+        msg.write();
+        ChannelData data = new ChannelData(true);
+        data.setAddress(local);
+        data.setMessage(msg.getBuffer());
+        data.setOptions(Channel.SEND_OPTIONS_USE_ACK);
+        data.setTimestamp(System.currentTimeMillis());
+        return data;
+    }
+    
+    protected void viewChange(UniqueId viewId, Member[] view) {
+        //invoke any listeners
+    }
+    
+    protected boolean alive(Member mbr) {
+        return TcpFailureDetector.memberAlive(mbr,
+                                              COORD_ALIVE,
+                                              false,
+                                              false,
+                                              waitForCoordMsgTimeout,
+                                              waitForCoordMsgTimeout,
+                                              getOptionFlag());
+    }
+    
+    protected Membership mergeOnArrive(CoordinationMessage msg, Member sender) {
+        fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_PRE_MERGE,this,"Pre merge"));
+        MemberImpl local = (MemberImpl)getLocalMember(false);
+        Membership merged = new Membership(local,AbsoluteOrder.comp,true);
+        Arrays.fill(merged,msg.getMembers());
+        Arrays.fill(merged,getMembers());
+        Member[] diff = Arrays.diff(merged,membership,local);
+        for ( int i=0; i<diff.length; i++ ) {
+            if (!alive(diff[i])) merged.removeMember((MemberImpl)diff[i]);
+            else memberAdded(diff[i],false);
+        }
+        fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_POST_MERGE,this,"Post merge"));
+        return merged;
+    }
+    
+    protected void processCoordMessage(CoordinationMessage msg, Member sender) throws ChannelException {
+        if ( !coordMsgReceived.get() ) {
+            coordMsgReceived.set(true);
+            synchronized (electionMutex) { electionMutex.notifyAll();}
+        } 
+        msg.timestamp = System.currentTimeMillis();
+        Membership merged = mergeOnArrive(msg, sender);
+        if (isViewConf(msg)) handleViewConf(msg, sender, merged);
+        else handleToken(msg, sender, merged);
+        ClassLoader loader;
+
+    }
+    
+    protected void handleToken(CoordinationMessage msg, Member sender,Membership merged) throws ChannelException {
+        MemberImpl local = (MemberImpl)getLocalMember(false);
+        if ( local.equals(msg.getSource()) ) {
+            //my message msg.src=local
+            handleMyToken(local, msg, sender,merged);
+        } else {
+            handleOtherToken(local, msg, sender,merged);
+        }
+    }
+    
+    protected void handleMyToken(MemberImpl local, CoordinationMessage msg, Member sender,Membership merged) throws ChannelException {
+        if ( local.equals(msg.getLeader()) ) {
+            //no leadership change
+            if ( Arrays.sameMembers(msg.getMembers(),merged.getMembers()) ) {
+                msg.type = COORD_CONF;
+                super.sendMessage(Arrays.remove(msg.getMembers(),local),createData(msg,local),null);
+                handleViewConf(msg,local,merged);
+            } else {
+                //membership change
+                suggestedView = new Membership(local,AbsoluteOrder.comp,true);
+                suggestedviewId = msg.getId();
+                Arrays.fill(suggestedView,merged.getMembers());
+                msg.view = (MemberImpl[])merged.getMembers();
+                sendElectionMsgToNextInline(local,msg);
+            }
+        } else {
+            //leadership change
+            suggestedView = null;
+            suggestedviewId = null;
+            msg.view = (MemberImpl[])merged.getMembers();
+            sendElectionMsgToNextInline(local,msg);
+        }
+    }
+    
+    protected void handleOtherToken(MemberImpl local, CoordinationMessage msg, Member sender,Membership merged) throws ChannelException {
+        if ( local.equals(msg.getLeader()) ) {
+            //I am the new leader
+            //startElection(false);
+        } else {
+            msg.view = (MemberImpl[])merged.getMembers();
+            sendElectionMsgToNextInline(local,msg);
+        }
+    }
+    
+    protected void handleViewConf(CoordinationMessage msg, Member sender,Membership merged) throws ChannelException {
+        if ( viewId != null && msg.getId().equals(viewId) ) return;//we already have this view
+        view = new Membership((MemberImpl)getLocalMember(false),AbsoluteOrder.comp,true);
+        Arrays.fill(view,msg.getMembers());
+        viewId = msg.getId();
+        
+        if ( viewId.equals(suggestedviewId) ) {
+            suggestedView = null;
+            suggestedviewId = null;
+        }
+        
+        if (suggestedView != null && AbsoluteOrder.comp.compare(suggestedView.getMembers()[0],merged.getMembers()[0])<0 ) {
+            suggestedView = null;
+            suggestedviewId = null;
+        }
+        
+        viewChange(viewId,view.getMembers());
+        fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_CONF_RX,this,"Accepted View"));
+        
+        if ( suggestedviewId == null && hasHigherPriority(merged.getMembers(),membership.getMembers()) ) {
+            startElection(false);
+        }
+    }
+    
+    protected boolean isViewConf(CoordinationMessage msg) {
+        return Arrays.contains(msg.getType(),0,COORD_CONF,0,COORD_CONF.length);
+    }
+    
+    protected boolean hasHigherPriority(Member[] complete, Member[] local) {
+        if ( local == null || local.length == 0 ) return false;
+        if ( complete == null || complete.length == 0 ) return true;
+        AbsoluteOrder.absoluteOrder(complete);
+        AbsoluteOrder.absoluteOrder(local);
+        return (AbsoluteOrder.comp.compare(complete[0],local[0]) > 0);
+        
+    }
+
+    
+    /**
+     * Returns coordinator if one is available
+     * @return Member
+     */
+    public Member getCoordinator() {
+        return (view != null && view.hasMembers()) ? view.getMembers()[0] : null;
+    }
+    
+    public Member[] getView() {
+        return (view != null && view.hasMembers()) ? view.getMembers() : new Member[0];
+    }
+    
+    public UniqueId getViewId() {
+        return viewId;
+    }
+    
+    /**
+    * Block in/out messages while a election is going on
+    */
+   protected void halt() {
+
+   }
+
+   /**
+    * Release lock for in/out messages election is completed
+    */
+   protected void release() {
+
+   }
+
+   /**
+    * Wait for an election to end
+    */
+   protected void waitForRelease() {
+
+   }
+
+    
+//============================================================================================================    
+//              OVERRIDDEN METHODS FROM CHANNEL INTERCEPTOR BASE    
+//============================================================================================================
+    public void start(int svc) throws ChannelException {
+            if (membership == null) setupMembership();
+            if (started)return;
+            fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START, this, "Before start"));
+            super.start(startsvc);
+            started = true;
+            if (view == null) view = new Membership( (MemberImpl)super.getLocalMember(true), AbsoluteOrder.comp, true);
+            fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START, this, "After start"));
+            startElection(false);
+    }
+
+    public void stop(int svc) throws ChannelException {
+        try {
+            halt();
+            synchronized (electionMutex) {
+                if (!started)return;
+                started = false;
+                fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_STOP, this, "Before stop"));
+                super.stop(startsvc);
+                this.view = null;
+                this.viewId = null;
+                this.suggestedView = null;
+                this.suggestedviewId = null;
+                this.membership.reset();
+                fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_STOP, this, "After stop"));
+            }
+        }finally {
+            release();
+        }
+    }
+    
+    
+    public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
+        waitForRelease();
+        super.sendMessage(destination, msg, payload);
+    }
+
+    public void messageReceived(ChannelMessage msg) {
+        if ( Arrays.contains(msg.getMessage().getBytesDirect(),0,COORD_ALIVE,0,COORD_ALIVE.length) ) {
+            //ignore message, its an alive message
+            fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MSG_ARRIVE,this,"Alive Message"));
+
+        } else if ( Arrays.contains(msg.getMessage().getBytesDirect(),0,COORD_HEADER,0,COORD_HEADER.length) ) {
+            try {
+                CoordinationMessage cmsg = new CoordinationMessage(msg.getMessage());
+                Member[] cmbr = cmsg.getMembers();
+                fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MSG_ARRIVE,this,"Coord Msg Arrived("+Arrays.toNameString(cmbr)+")"));
+                processCoordMessage(cmsg, msg.getAddress());
+            }catch ( ChannelException x ) {
+                log.error("Error processing coordination message. Could be fatal.",x);
+            }
+        } else {
+            super.messageReceived(msg);
+        }
+    }
+
+    public boolean accept(ChannelMessage msg) {
+        return super.accept(msg);
+    }
+
+    public void memberAdded(Member member) {
+        memberAdded(member,true);
+    }
+
+    public void memberAdded(Member member,boolean elect) {
+        try {
+            if ( membership == null ) setupMembership();
+            if ( membership.memberAlive((MemberImpl)member) ) super.memberAdded(member);
+            try {
+                fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MBR_ADD,this,"Member add("+member.getName()+")"));
+                if (started && elect) startElection(false);
+            }catch ( ChannelException x ) {
+                log.error("Unable to start election when member was added.",x);
+            }
+        }finally {
+        }
+        
+    }
+
+    public void memberDisappeared(Member member) {
+        try {
+            
+            membership.removeMember((MemberImpl)member);
+            super.memberDisappeared(member);
+            try {
+                fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MBR_DEL,this,"Member remove("+member.getName()+")"));
+                if ( started && (isCoordinator() || isHighest()) ) 
+                    startElection(true); //to do, if a member disappears, only the coordinator can start
+            }catch ( ChannelException x ) {
+                log.error("Unable to start election when member was removed.",x);
+            }
+        }finally {
+        }
+    }
+    
+    public boolean isHighest() {
+        Member local = getLocalMember(false);
+        if ( membership.getMembers().length == 0 ) return true;
+        else return AbsoluteOrder.comp.compare(local,membership.getMembers()[0])<=0;
+    }
+    
+    public boolean isCoordinator() {
+        Member coord = getCoordinator();
+        return coord != null && getLocalMember(false).equals(coord);
+    }
+
+    public void heartbeat() {
+        try {
+            MemberImpl local = (MemberImpl)getLocalMember(false);
+            if ( view != null && (Arrays.diff(view,membership,local).length != 0 ||  Arrays.diff(membership,view,local).length != 0) ) {
+                if ( isHighest() ) {
+                    fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START_ELECT, this,
+                                                               "Heartbeat found inconsistency, restart election"));
+                    startElection(true);
+                }            
+            }
+        } catch ( Exception x  ){
+            log.error("Unable to perform heartbeat.",x);
+        } finally {
+            super.heartbeat();
+        }
+    }
+
+    /**
+     * has members
+     */
+    public boolean hasMembers() {
+        
+        return membership.hasMembers();
+    }
+
+    /**
+     * Get all current cluster members
+     * @return all members or empty array
+     */
+    public Member[] getMembers() {
+        
+        return membership.getMembers();
+    }
+
+    /**
+     *
+     * @param mbr Member
+     * @return Member
+     */
+    public Member getMember(Member mbr) {
+        
+        return membership.getMember(mbr);
+    }
+
+    /**
+     * Return the member that represents this node.
+     *
+     * @return Member
+     */
+    public Member getLocalMember(boolean incAlive) {
+        Member local = super.getLocalMember(incAlive);
+        if ( view == null && (local != null)) setupMembership();
+        return local;
+    }
+    
+    protected synchronized void setupMembership() {
+        if ( membership == null ) {
+            membership  = new Membership((MemberImpl)super.getLocalMember(true),AbsoluteOrder.comp,false);
+        }
+    }
+    
+    
+//============================================================================================================    
+//              HELPER CLASSES FOR COORDINATION
+//============================================================================================================
+    
+    
+   
+    
+    public static class CoordinationMessage {
+        //X{A-ldr, A-src, mbrs-A,B,C,D}
+        protected XByteBuffer buf;
+        protected MemberImpl leader;
+        protected MemberImpl source;
+        protected MemberImpl[] view;
+        protected UniqueId id;
+        protected byte[] type;
+        protected long timestamp = System.currentTimeMillis();
+        
+        public CoordinationMessage(XByteBuffer buf) {
+            this.buf = buf;
+            parse();
+        }
+
+        public CoordinationMessage(MemberImpl leader,
+                                   MemberImpl source, 
+                                   MemberImpl[] view,
+                                   UniqueId id,
+                                   byte[] type) {
+            this.buf = new XByteBuffer(4096,false);
+            this.leader = leader;
+            this.source = source;
+            this.view = view;
+            this.id = id;
+            this.type = type;
+            this.write();
+        }
+        
+
+        public byte[] getHeader() {
+            return NonBlockingCoordinator.COORD_HEADER;
+        }
+        
+        public MemberImpl getLeader() {
+            if ( leader == null ) parse();
+            return leader;
+        }
+        
+        public MemberImpl getSource() {
+            if ( source == null ) parse();
+            return source;
+        }
+        
+        public UniqueId getId() {
+            if ( id == null ) parse();
+            return id;
+        }
+        
+        public MemberImpl[] getMembers() {
+            if ( view == null ) parse();
+            return view;
+        }
+        
+        public byte[] getType() {
+            if (type == null ) parse();
+            return type;
+        }
+        
+        public XByteBuffer getBuffer() {
+            return this.buf;
+        }
+        
+        public void parse() {
+            //header
+            int offset = 16;
+            //leader
+            int ldrLen = buf.toInt(buf.getBytesDirect(),offset);
+            offset += 4;
+            byte[] ldr = new byte[ldrLen];
+            System.arraycopy(buf.getBytesDirect(),offset,ldr,0,ldrLen);
+            leader = MemberImpl.getMember(ldr);
+            offset += ldrLen;
+            //source
+            int srcLen = buf.toInt(buf.getBytesDirect(),offset);
+            offset += 4;
+            byte[] src = new byte[srcLen];
+            System.arraycopy(buf.getBytesDirect(),offset,src,0,srcLen);
+            source = MemberImpl.getMember(src);
+            offset += srcLen;
+            //view
+            int mbrCount = buf.toInt(buf.getBytesDirect(),offset);
+            offset += 4;
+            view = new MemberImpl[mbrCount];
+            for (int i=0; i<view.length; i++ ) {
+                int mbrLen = buf.toInt(buf.getBytesDirect(),offset);
+                offset += 4;
+                byte[] mbr = new byte[mbrLen];
+                System.arraycopy(buf.getBytesDirect(), offset, mbr, 0, mbrLen);
+                view[i] = MemberImpl.getMember(mbr);
+                offset += mbrLen;
+            }
+            //id
+            this.id = new UniqueId(buf.getBytesDirect(),offset,16);
+            offset += 16;
+            type = new byte[16];
+            System.arraycopy(buf.getBytesDirect(), offset, type, 0, type.length);
+            offset += 16;
+            
+        }
+        
+        public void write() {
+            buf.reset();
+            //header
+            buf.append(COORD_HEADER,0,COORD_HEADER.length);
+            //leader
+            byte[] ldr = leader.getData(false,false);
+            buf.append(ldr.length);
+            buf.append(ldr,0,ldr.length);
+            ldr = null;
+            //source
+            byte[] src = source.getData(false,false);
+            buf.append(src.length);
+            buf.append(src,0,src.length);
+            src = null;
+            //view
+            buf.append(view.length);
+            for (int i=0; i<view.length; i++ ) {
+                byte[] mbr = view[i].getData(false,false);
+                buf.append(mbr.length);
+                buf.append(mbr,0,mbr.length);
+            }
+            //id
+            buf.append(id.getBytes(),0,id.getBytes().length);
+            buf.append(type,0,type.length);
+        }
+    }
+    
+    public void fireInterceptorEvent(InterceptorEvent event) {
+        if (event instanceof CoordinationEvent &&
+            ((CoordinationEvent)event).type == CoordinationEvent.EVT_CONF_RX) 
+            log.info(event);
+    }
+    
+    public static class CoordinationEvent implements InterceptorEvent {
+        public static final int EVT_START = 1;
+        public static final int EVT_MBR_ADD = 2;
+        public static final int EVT_MBR_DEL = 3;
+        public static final int EVT_START_ELECT = 4;
+        public static final int EVT_PROCESS_ELECT = 5;
+        public static final int EVT_MSG_ARRIVE = 6;
+        public static final int EVT_PRE_MERGE = 7;
+        public static final int EVT_POST_MERGE = 8;
+        public static final int EVT_WAIT_FOR_MSG = 9;
+        public static final int EVT_SEND_MSG = 10;
+        public static final int EVT_STOP = 11;
+        public static final int EVT_CONF_RX = 12;
+        public static final int EVT_ELECT_ABANDONED = 13;
+        
+        int type;
+        ChannelInterceptor interceptor;
+        Member coord; 
+        Member[] mbrs;
+        String info;
+        Membership view;
+        Membership suggestedView;
+        public CoordinationEvent(int type,ChannelInterceptor interceptor, String info) {
+            this.type = type;
+            this.interceptor = interceptor;
+            this.coord = ((NonBlockingCoordinator)interceptor).getCoordinator();
+            this.mbrs = ((NonBlockingCoordinator)interceptor).membership.getMembers();
+            this.info = info;
+            this.view = ((NonBlockingCoordinator)interceptor).view;
+            this.suggestedView = ((NonBlockingCoordinator)interceptor).suggestedView;
+        }
+        
+        public int getEventType() {
+            return type;
+        }
+        
+        public String getEventTypeDesc() {
+            switch (type) {
+                case  EVT_START: return "EVT_START:"+info;
+                case  EVT_MBR_ADD: return "EVT_MBR_ADD:"+info;
+                case  EVT_MBR_DEL: return "EVT_MBR_DEL:"+info;
+                case  EVT_START_ELECT: return "EVT_START_ELECT:"+info;
+                case  EVT_PROCESS_ELECT: return "EVT_PROCESS_ELECT:"+info;
+                case  EVT_MSG_ARRIVE: return "EVT_MSG_ARRIVE:"+info;
+                case  EVT_PRE_MERGE: return "EVT_PRE_MERGE:"+info;
+                case  EVT_POST_MERGE: return "EVT_POST_MERGE:"+info;
+                case  EVT_WAIT_FOR_MSG: return "EVT_WAIT_FOR_MSG:"+info;
+                case  EVT_SEND_MSG: return "EVT_SEND_MSG:"+info;
+                case  EVT_STOP: return "EVT_STOP:"+info;
+                case  EVT_CONF_RX: return "EVT_CONF_RX:"+info;
+                case EVT_ELECT_ABANDONED: return "EVT_ELECT_ABANDONED:"+info;
+                default: return "Unknown";
+            }
+        }
+        
+        public ChannelInterceptor getInterceptor() {
+            return interceptor;
+        }
+        
+        public String toString() {
+            StringBuffer buf = new StringBuffer("CoordinationEvent[type=");
+            buf.append(type).append("\n\tLocal:");
+            Member local = interceptor.getLocalMember(false);
+            buf.append(local!=null?local.getName():"").append("\n\tCoord:");
+            buf.append(coord!=null?coord.getName():"").append("\n\tView:");
+            buf.append(Arrays.toNameString(view!=null?view.getMembers():null)).append("\n\tSuggested View:");
+            buf.append(Arrays.toNameString(suggestedView!=null?suggestedView.getMembers():null)).append("\n\tMembers:");
+            buf.append(Arrays.toNameString(mbrs)).append("\n\tInfo:");
+            buf.append(info).append("]");
+            return buf.toString();
+        }
+    }
+
+    
+
+
+
 }

Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org