You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2006/10/24 04:45:59 UTC
svn commit: r467206 [7/30] - in /tomcat: build/tc5.5.x/ connectors/trunk/
connectors/trunk/ajp/ajplib/test/ connectors/trunk/ajp/proxy/
connectors/trunk/jk/jkstatus/src/share/org/apache/jk/status/
connectors/trunk/jk/native/iis/ connectors/trunk/jk/nat...
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java?view=diff&rev=467206&r1=467205&r2=467206
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java Mon Oct 23 19:45:46 2006
@@ -1,840 +1,840 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- */
-package org.apache.catalina.tribes.group.interceptors;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.catalina.tribes.Channel;
-import org.apache.catalina.tribes.ChannelException;
-import org.apache.catalina.tribes.ChannelInterceptor;
-import org.apache.catalina.tribes.ChannelInterceptor.InterceptorEvent;
-import org.apache.catalina.tribes.ChannelMessage;
-import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.UniqueId;
-import org.apache.catalina.tribes.group.AbsoluteOrder;
-import org.apache.catalina.tribes.group.ChannelInterceptorBase;
-import org.apache.catalina.tribes.group.InterceptorPayload;
-import org.apache.catalina.tribes.io.ChannelData;
-import org.apache.catalina.tribes.io.XByteBuffer;
-import org.apache.catalina.tribes.membership.MemberImpl;
-import org.apache.catalina.tribes.membership.Membership;
-import org.apache.catalina.tribes.util.Arrays;
-import org.apache.catalina.tribes.util.UUIDGenerator;
-
-/**
- * <p>Title: Auto merging leader election algorithm</p>
- *
- * <p>Description: Implementation of a simple coordinator algorithm that not only selects a coordinator,
- * it also merges groups automatically when members are discovered that werent part of the
- * </p>
- * <p>This algorithm is non blocking meaning it allows for transactions while the coordination phase is going on
- * </p>
- * <p>This implementation is based on a home brewed algorithm that uses the AbsoluteOrder of a membership
- * to pass a token ring of the current membership.<br>
- * This is not the same as just using AbsoluteOrder! Consider the following scenario:<br>
- * Nodes, A,B,C,D,E on a network, in that priority. AbsoluteOrder will only work if all
- * nodes are receiving pings from all the other nodes.
- * meaning, that node{i} receives pings from node{all}-node{i}<br>
- * but the following could happen if a multicast problem occurs.
- * A has members {B,C,D}<br>
- * B has members {A,C}<br>
- * C has members {D,E}<br>
- * D has members {A,B,C,E}<br>
- * E has members {A,C,D}<br>
- * Because the default Tribes membership implementation, relies on the multicast packets to
- * arrive at all nodes correctly, there is nothing guaranteeing that it will.<br>
- * <br>
- * To best explain how this algorithm works, lets take the above example:
- * For simplicity we assume that a send operation is O(1) for all nodes, although this algorithm will work
- * where messages overlap, as they all depend on absolute order<br>
- * Scenario 1: A,B,C,D,E all come online at the same time
- * Eval phase, A thinks of itself as leader, B thinks of A as leader,
- * C thinks of itself as leader, D,E think of A as leader<br>
- * Token phase:<br>
- * (1) A sends out a message X{A-ldr, A-src, mbrs-A,B,C,D} to B where X is the id for the message(and the view)<br>
- * (1) C sends out a message Y{C-ldr, C-src, mbrs-C,D,E} to D where Y is the id for the message(and the view)<br>
- * (2) B receives X{A-ldr, A-src, mbrs-A,B,C,D}, sends X{A-ldr, A-src, mbrs-A,B,C,D} to C <br>
- * (2) D receives Y{C-ldr, C-src, mbrs-C,D,E} D is aware of A,B, sends Y{A-ldr, C-src, mbrs-A,B,C,D,E} to E<br>
- * (3) C receives X{A-ldr, A-src, mbrs-A,B,C,D}, sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to D<br>
- * (3) E receives Y{A-ldr, C-src, mbrs-A,B,C,D,E} sends Y{A-ldr, C-src, mbrs-A,B,C,D,E} to A<br>
- * (4) D receives X{A-ldr, A-src, mbrs-A,B,C,D,E} sends sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to A<br>
- * (4) A receives Y{A-ldr, C-src, mbrs-A,B,C,D,E}, holds the message, add E to its list of members<br>
- * (5) A receives X{A-ldr, A-src, mbrs-A,B,C,D,E} <br>
- * At this point, the state looks like<br>
- * A - {A-ldr, mbrs-A,B,C,D,E, id=X}<br>
- * B - {A-ldr, mbrs-A,B,C,D, id=X}<br>
- * C - {A-ldr, mbrs-A,B,C,D,E, id=X}<br>
- * D - {A-ldr, mbrs-A,B,C,D,E, id=X}<br>
- * E - {A-ldr, mbrs-A,B,C,D,E, id=Y}<br>
- * <br>
- * A message doesn't stop until it reaches its original sender, unless its dropped by a higher leader.
- * As you can see, E still thinks the viewId=Y, which is not correct. But at this point we have
- * arrived at the same membership and all nodes are informed of each other.<br>
- * To synchronize the rest we simply perform the following check at A when A receives X:<br>
- * Original X{A-ldr, A-src, mbrs-A,B,C,D} == Arrived X{A-ldr, A-src, mbrs-A,B,C,D,E}<br>
- * Since the condition is false, A, will resend the token, and A sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to B
- * When A receives X again, the token is complete. <br>
- * Optionally, A can send a message X{A-ldr, A-src, mbrs-A,B,C,D,E confirmed} to A,B,C,D,E who then
- * install and accept the view.
- * </p>
- * <p>
- * Lets assume that C1 arrives, C1 has lower priority than C, but higher priority than D.<br>
- * Lets also assume that C1 sees the following view {B,D,E}<br>
- * C1 waits for a token to arrive. When the token arrives, the same scenario as above will happen.<br>
- * In the scenario where C1 sees {D,E} and A,B,C can not see C1, no token will ever arrive.<br>
- * In this case, C1 sends a Z{C1-ldr, C1-src, mbrs-C1,D,E} to D<br>
- * D receives Z{C1-ldr, C1-src, mbrs-C1,D,E} and sends Z{A-ldr, C1-src, mbrs-A,B,C,C1,D,E} to E<br>
- * E receives Z{A-ldr, C1-src, mbrs-A,B,C,C1,D,E} and sends it to A<br>
- * A sends Z{A-ldr, A-src, mbrs-A,B,C,C1,D,E} to B and the chain continues until A receives the token again.
- * At that time A optionally sends out Z{A-ldr, A-src, mbrs-A,B,C,C1,D,E, confirmed} to A,B,C,C1,D,E
- * </p>
- * <p>To ensure that the view gets implemented at all nodes at the same time,
- * A will send out a VIEW_CONF message, this is the 'confirmed' message that is optional above.
- * <p>Ideally, the interceptor below this one would be the TcpFailureDetector to ensure correct memberships</p>
- *
- * <p>The example above, of course can be simplified with a finite statemachine:<br>
- * But I suck at writing state machines, my head gets all confused. One day I will document this algorithm though.<br>
- * Maybe I'll do a state diagram :)
- * </p>
- * <h2>State Diagrams</h2>
- * <a href="http://people.apache.org/~fhanik/tribes/docs/leader-election-initiate-election.jpg">Initiate an election</a><br><br>
- * <a href="http://people.apache.org/~fhanik/tribes/docs/leader-election-message-arrives.jpg">Receive an election message</a><br><br>
- *
- * @author Filip Hanik
- * @version 1.0
- *
- *
- *
- */
-public class NonBlockingCoordinator extends ChannelInterceptorBase {
-
- /**
- * header for a coordination message
- */
- protected static final byte[] COORD_HEADER = new byte[] {-86, 38, -34, -29, -98, 90, 65, 63, -81, -122, -6, -110, 99, -54, 13, 63};
- /**
- * Coordination request
- */
- protected static final byte[] COORD_REQUEST = new byte[] {104, -95, -92, -42, 114, -36, 71, -19, -79, 20, 122, 101, -1, -48, -49, 30};
- /**
- * Coordination confirmation, for blocking installations
- */
- protected static final byte[] COORD_CONF = new byte[] {67, 88, 107, -86, 69, 23, 76, -70, -91, -23, -87, -25, -125, 86, 75, 20};
-
- /**
- * Alive message
- */
- protected static final byte[] COORD_ALIVE = new byte[] {79, -121, -25, -15, -59, 5, 64, 94, -77, 113, -119, -88, 52, 114, -56, -46,
- -18, 102, 10, 34, -127, -9, 71, 115, -70, 72, -101, 88, 72, -124, 127, 111,
- 74, 76, -116, 50, 111, 103, 65, 3, -77, 51, -35, 0, 119, 117, 9, -26,
- 119, 50, -75, -105, -102, 36, 79, 37, -68, -84, -123, 15, -22, -109, 106, -55};
- /**
- * Time to wait for coordination timeout
- */
- protected long waitForCoordMsgTimeout = 15000;
- /**
- * Our current view
- */
- protected Membership view = null;
- /**
- * Out current viewId
- */
- protected UniqueId viewId;
-
- /**
- * Our nonblocking membership
- */
- protected Membership membership = null;
-
- /**
- * indicates that we are running an election
- * and this is the one we are running
- */
- protected UniqueId suggestedviewId;
- protected Membership suggestedView;
-
- protected boolean started = false;
- protected final int startsvc = 0xFFFF;
-
- protected Object electionMutex = new Object();
-
- protected AtomicBoolean coordMsgReceived = new AtomicBoolean(false);
-
- public NonBlockingCoordinator() {
- super();
- }
-
-//============================================================================================================
-// COORDINATION HANDLING
-//============================================================================================================
-
- public void startElection(boolean force) throws ChannelException {
- synchronized (electionMutex) {
- MemberImpl local = (MemberImpl)getLocalMember(false);
- MemberImpl[] others = (MemberImpl[])membership.getMembers();
- fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START_ELECT,this,"Election initated"));
- if ( others.length == 0 ) {
- this.viewId = new UniqueId(UUIDGenerator.randomUUID(false));
- this.view = new Membership(local,AbsoluteOrder.comp, true);
- this.handleViewConf(this.createElectionMsg(local,others,local),local,view);
- return; //the only member, no need for an election
- }
- if ( suggestedviewId != null ) {
-
- if ( view != null && Arrays.diff(view,suggestedView,local).length == 0 && Arrays.diff(suggestedView,view,local).length == 0) {
- suggestedviewId = null;
- suggestedView = null;
- fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, running election matches view"));
- } else {
- fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, election running"));
- }
- return; //election already running, I'm not allowed to have two of them
- }
- if ( view != null && Arrays.diff(view,membership,local).length == 0 && Arrays.diff(membership,view,local).length == 0) {
- fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, view matches membership"));
- return; //already have this view installed
- }
- int prio = AbsoluteOrder.comp.compare(local,others[0]);
- MemberImpl leader = ( prio < 0 )?local:others[0];//am I the leader in my view?
- if ( local.equals(leader) || force ) {
- CoordinationMessage msg = createElectionMsg(local, others, leader);
- suggestedviewId = msg.getId();
- suggestedView = new Membership(local,AbsoluteOrder.comp,true);
- Arrays.fill(suggestedView,msg.getMembers());
- fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_PROCESS_ELECT,this,"Election, sending request"));
- sendElectionMsg(local,others[0],msg);
- } else {
- try {
- coordMsgReceived.set(false);
- fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_WAIT_FOR_MSG,this,"Election, waiting for request"));
- electionMutex.wait(waitForCoordMsgTimeout);
- }catch ( InterruptedException x ) {
- Thread.currentThread().interrupted();
- }
- if ( suggestedviewId == null && (!coordMsgReceived.get())) {
- //no message arrived, send the coord msg
-// fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_WAIT_FOR_MSG,this,"Election, waiting timed out."));
-// startElection(true);
- fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, waiting timed out."));
- } else {
- fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, received a message"));
- }
- }//end if
-
- }
- }
-
- private CoordinationMessage createElectionMsg(MemberImpl local, MemberImpl[] others, MemberImpl leader) {
- Membership m = new Membership(local,AbsoluteOrder.comp,true);
- Arrays.fill(m,others);
- MemberImpl[] mbrs = m.getMembers();
- m.reset();
- CoordinationMessage msg = new CoordinationMessage(leader, local, mbrs,new UniqueId(UUIDGenerator.randomUUID(true)), this.COORD_REQUEST);
- return msg;
- }
-
- protected void sendElectionMsg(MemberImpl local, MemberImpl next, CoordinationMessage msg) throws ChannelException {
- fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_SEND_MSG,this,"Sending election message to("+next.getName()+")"));
- super.sendMessage(new Member[] {next}, createData(msg, local), null);
- }
-
- protected void sendElectionMsgToNextInline(MemberImpl local, CoordinationMessage msg) throws ChannelException {
- int next = Arrays.nextIndex(local,msg.getMembers());
- int current = next;
- msg.leader = msg.getMembers()[0];
- boolean sent = false;
- while ( !sent && current >= 0 ) {
- try {
- sendElectionMsg(local, (MemberImpl) msg.getMembers()[current], msg);
- sent = true;
- }catch ( ChannelException x ) {
- log.warn("Unable to send election message to:"+msg.getMembers()[current]);
- current = Arrays.nextIndex(msg.getMembers()[current],msg.getMembers());
- if ( current == next ) throw x;
- }
- }
- }
-
- public Member getNextInLine(MemberImpl local, MemberImpl[] others) {
- MemberImpl result = null;
- for ( int i=0; i<others.length; i++ ) {
-
- }
- return result;
- }
-
- public ChannelData createData(CoordinationMessage msg, MemberImpl local) {
- msg.write();
- ChannelData data = new ChannelData(true);
- data.setAddress(local);
- data.setMessage(msg.getBuffer());
- data.setOptions(Channel.SEND_OPTIONS_USE_ACK);
- data.setTimestamp(System.currentTimeMillis());
- return data;
- }
-
- protected void viewChange(UniqueId viewId, Member[] view) {
- //invoke any listeners
- }
-
- protected boolean alive(Member mbr) {
- return TcpFailureDetector.memberAlive(mbr,
- COORD_ALIVE,
- false,
- false,
- waitForCoordMsgTimeout,
- waitForCoordMsgTimeout,
- getOptionFlag());
- }
-
- protected Membership mergeOnArrive(CoordinationMessage msg, Member sender) {
- fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_PRE_MERGE,this,"Pre merge"));
- MemberImpl local = (MemberImpl)getLocalMember(false);
- Membership merged = new Membership(local,AbsoluteOrder.comp,true);
- Arrays.fill(merged,msg.getMembers());
- Arrays.fill(merged,getMembers());
- Member[] diff = Arrays.diff(merged,membership,local);
- for ( int i=0; i<diff.length; i++ ) {
- if (!alive(diff[i])) merged.removeMember((MemberImpl)diff[i]);
- else memberAdded(diff[i],false);
- }
- fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_POST_MERGE,this,"Post merge"));
- return merged;
- }
-
- protected void processCoordMessage(CoordinationMessage msg, Member sender) throws ChannelException {
- if ( !coordMsgReceived.get() ) {
- coordMsgReceived.set(true);
- synchronized (electionMutex) { electionMutex.notifyAll();}
- }
- msg.timestamp = System.currentTimeMillis();
- Membership merged = mergeOnArrive(msg, sender);
- if (isViewConf(msg)) handleViewConf(msg, sender, merged);
- else handleToken(msg, sender, merged);
- ClassLoader loader;
-
- }
-
- protected void handleToken(CoordinationMessage msg, Member sender,Membership merged) throws ChannelException {
- MemberImpl local = (MemberImpl)getLocalMember(false);
- if ( local.equals(msg.getSource()) ) {
- //my message msg.src=local
- handleMyToken(local, msg, sender,merged);
- } else {
- handleOtherToken(local, msg, sender,merged);
- }
- }
-
- protected void handleMyToken(MemberImpl local, CoordinationMessage msg, Member sender,Membership merged) throws ChannelException {
- if ( local.equals(msg.getLeader()) ) {
- //no leadership change
- if ( Arrays.sameMembers(msg.getMembers(),merged.getMembers()) ) {
- msg.type = COORD_CONF;
- super.sendMessage(Arrays.remove(msg.getMembers(),local),createData(msg,local),null);
- handleViewConf(msg,local,merged);
- } else {
- //membership change
- suggestedView = new Membership(local,AbsoluteOrder.comp,true);
- suggestedviewId = msg.getId();
- Arrays.fill(suggestedView,merged.getMembers());
- msg.view = (MemberImpl[])merged.getMembers();
- sendElectionMsgToNextInline(local,msg);
- }
- } else {
- //leadership change
- suggestedView = null;
- suggestedviewId = null;
- msg.view = (MemberImpl[])merged.getMembers();
- sendElectionMsgToNextInline(local,msg);
- }
- }
-
- protected void handleOtherToken(MemberImpl local, CoordinationMessage msg, Member sender,Membership merged) throws ChannelException {
- if ( local.equals(msg.getLeader()) ) {
- //I am the new leader
- //startElection(false);
- } else {
- msg.view = (MemberImpl[])merged.getMembers();
- sendElectionMsgToNextInline(local,msg);
- }
- }
-
- protected void handleViewConf(CoordinationMessage msg, Member sender,Membership merged) throws ChannelException {
- if ( viewId != null && msg.getId().equals(viewId) ) return;//we already have this view
- view = new Membership((MemberImpl)getLocalMember(false),AbsoluteOrder.comp,true);
- Arrays.fill(view,msg.getMembers());
- viewId = msg.getId();
-
- if ( viewId.equals(suggestedviewId) ) {
- suggestedView = null;
- suggestedviewId = null;
- }
-
- if (suggestedView != null && AbsoluteOrder.comp.compare(suggestedView.getMembers()[0],merged.getMembers()[0])<0 ) {
- suggestedView = null;
- suggestedviewId = null;
- }
-
- viewChange(viewId,view.getMembers());
- fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_CONF_RX,this,"Accepted View"));
-
- if ( suggestedviewId == null && hasHigherPriority(merged.getMembers(),membership.getMembers()) ) {
- startElection(false);
- }
- }
-
- protected boolean isViewConf(CoordinationMessage msg) {
- return Arrays.contains(msg.getType(),0,COORD_CONF,0,COORD_CONF.length);
- }
-
- protected boolean hasHigherPriority(Member[] complete, Member[] local) {
- if ( local == null || local.length == 0 ) return false;
- if ( complete == null || complete.length == 0 ) return true;
- AbsoluteOrder.absoluteOrder(complete);
- AbsoluteOrder.absoluteOrder(local);
- return (AbsoluteOrder.comp.compare(complete[0],local[0]) > 0);
-
- }
-
-
- /**
- * Returns coordinator if one is available
- * @return Member
- */
- public Member getCoordinator() {
- return (view != null && view.hasMembers()) ? view.getMembers()[0] : null;
- }
-
- public Member[] getView() {
- return (view != null && view.hasMembers()) ? view.getMembers() : new Member[0];
- }
-
- public UniqueId getViewId() {
- return viewId;
- }
-
- /**
- * Block in/out messages while a election is going on
- */
- protected void halt() {
-
- }
-
- /**
- * Release lock for in/out messages election is completed
- */
- protected void release() {
-
- }
-
- /**
- * Wait for an election to end
- */
- protected void waitForRelease() {
-
- }
-
-
-//============================================================================================================
-// OVERRIDDEN METHODS FROM CHANNEL INTERCEPTOR BASE
-//============================================================================================================
- public void start(int svc) throws ChannelException {
- if (membership == null) setupMembership();
- if (started)return;
- fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START, this, "Before start"));
- super.start(startsvc);
- started = true;
- if (view == null) view = new Membership( (MemberImpl)super.getLocalMember(true), AbsoluteOrder.comp, true);
- fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START, this, "After start"));
- startElection(false);
- }
-
- public void stop(int svc) throws ChannelException {
- try {
- halt();
- synchronized (electionMutex) {
- if (!started)return;
- started = false;
- fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_STOP, this, "Before stop"));
- super.stop(startsvc);
- this.view = null;
- this.viewId = null;
- this.suggestedView = null;
- this.suggestedviewId = null;
- this.membership.reset();
- fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_STOP, this, "After stop"));
- }
- }finally {
- release();
- }
- }
-
-
- public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
- waitForRelease();
- super.sendMessage(destination, msg, payload);
- }
-
- public void messageReceived(ChannelMessage msg) {
- if ( Arrays.contains(msg.getMessage().getBytesDirect(),0,COORD_ALIVE,0,COORD_ALIVE.length) ) {
- //ignore message, its an alive message
- fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MSG_ARRIVE,this,"Alive Message"));
-
- } else if ( Arrays.contains(msg.getMessage().getBytesDirect(),0,COORD_HEADER,0,COORD_HEADER.length) ) {
- try {
- CoordinationMessage cmsg = new CoordinationMessage(msg.getMessage());
- Member[] cmbr = cmsg.getMembers();
- fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MSG_ARRIVE,this,"Coord Msg Arrived("+Arrays.toNameString(cmbr)+")"));
- processCoordMessage(cmsg, msg.getAddress());
- }catch ( ChannelException x ) {
- log.error("Error processing coordination message. Could be fatal.",x);
- }
- } else {
- super.messageReceived(msg);
- }
- }
-
- public boolean accept(ChannelMessage msg) {
- return super.accept(msg);
- }
-
- public void memberAdded(Member member) {
- memberAdded(member,true);
- }
-
- public void memberAdded(Member member,boolean elect) {
- try {
- if ( membership == null ) setupMembership();
- if ( membership.memberAlive((MemberImpl)member) ) super.memberAdded(member);
- try {
- fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MBR_ADD,this,"Member add("+member.getName()+")"));
- if (started && elect) startElection(false);
- }catch ( ChannelException x ) {
- log.error("Unable to start election when member was added.",x);
- }
- }finally {
- }
-
- }
-
- public void memberDisappeared(Member member) {
- try {
-
- membership.removeMember((MemberImpl)member);
- super.memberDisappeared(member);
- try {
- fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MBR_DEL,this,"Member remove("+member.getName()+")"));
- if ( started && (isCoordinator() || isHighest()) )
- startElection(true); //to do, if a member disappears, only the coordinator can start
- }catch ( ChannelException x ) {
- log.error("Unable to start election when member was removed.",x);
- }
- }finally {
- }
- }
-
- public boolean isHighest() {
- Member local = getLocalMember(false);
- if ( membership.getMembers().length == 0 ) return true;
- else return AbsoluteOrder.comp.compare(local,membership.getMembers()[0])<=0;
- }
-
- public boolean isCoordinator() {
- Member coord = getCoordinator();
- return coord != null && getLocalMember(false).equals(coord);
- }
-
- public void heartbeat() {
- try {
- MemberImpl local = (MemberImpl)getLocalMember(false);
- if ( view != null && (Arrays.diff(view,membership,local).length != 0 || Arrays.diff(membership,view,local).length != 0) ) {
- if ( isHighest() ) {
- fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START_ELECT, this,
- "Heartbeat found inconsistency, restart election"));
- startElection(true);
- }
- }
- } catch ( Exception x ){
- log.error("Unable to perform heartbeat.",x);
- } finally {
- super.heartbeat();
- }
- }
-
- /**
- * has members
- */
- public boolean hasMembers() {
-
- return membership.hasMembers();
- }
-
- /**
- * Get all current cluster members
- * @return all members or empty array
- */
- public Member[] getMembers() {
-
- return membership.getMembers();
- }
-
- /**
- *
- * @param mbr Member
- * @return Member
- */
- public Member getMember(Member mbr) {
-
- return membership.getMember(mbr);
- }
-
- /**
- * Return the member that represents this node.
- *
- * @return Member
- */
- public Member getLocalMember(boolean incAlive) {
- Member local = super.getLocalMember(incAlive);
- if ( view == null && (local != null)) setupMembership();
- return local;
- }
-
- protected synchronized void setupMembership() {
- if ( membership == null ) {
- membership = new Membership((MemberImpl)super.getLocalMember(true),AbsoluteOrder.comp,false);
- }
- }
-
-
-//============================================================================================================
-// HELPER CLASSES FOR COORDINATION
-//============================================================================================================
-
-
-
-
- public static class CoordinationMessage {
- //X{A-ldr, A-src, mbrs-A,B,C,D}
- protected XByteBuffer buf;
- protected MemberImpl leader;
- protected MemberImpl source;
- protected MemberImpl[] view;
- protected UniqueId id;
- protected byte[] type;
- protected long timestamp = System.currentTimeMillis();
-
- public CoordinationMessage(XByteBuffer buf) {
- this.buf = buf;
- parse();
- }
-
- public CoordinationMessage(MemberImpl leader,
- MemberImpl source,
- MemberImpl[] view,
- UniqueId id,
- byte[] type) {
- this.buf = new XByteBuffer(4096,false);
- this.leader = leader;
- this.source = source;
- this.view = view;
- this.id = id;
- this.type = type;
- this.write();
- }
-
-
- public byte[] getHeader() {
- return NonBlockingCoordinator.COORD_HEADER;
- }
-
- public MemberImpl getLeader() {
- if ( leader == null ) parse();
- return leader;
- }
-
- public MemberImpl getSource() {
- if ( source == null ) parse();
- return source;
- }
-
- public UniqueId getId() {
- if ( id == null ) parse();
- return id;
- }
-
- public MemberImpl[] getMembers() {
- if ( view == null ) parse();
- return view;
- }
-
- public byte[] getType() {
- if (type == null ) parse();
- return type;
- }
-
- public XByteBuffer getBuffer() {
- return this.buf;
- }
-
- public void parse() {
- //header
- int offset = 16;
- //leader
- int ldrLen = buf.toInt(buf.getBytesDirect(),offset);
- offset += 4;
- byte[] ldr = new byte[ldrLen];
- System.arraycopy(buf.getBytesDirect(),offset,ldr,0,ldrLen);
- leader = MemberImpl.getMember(ldr);
- offset += ldrLen;
- //source
- int srcLen = buf.toInt(buf.getBytesDirect(),offset);
- offset += 4;
- byte[] src = new byte[srcLen];
- System.arraycopy(buf.getBytesDirect(),offset,src,0,srcLen);
- source = MemberImpl.getMember(src);
- offset += srcLen;
- //view
- int mbrCount = buf.toInt(buf.getBytesDirect(),offset);
- offset += 4;
- view = new MemberImpl[mbrCount];
- for (int i=0; i<view.length; i++ ) {
- int mbrLen = buf.toInt(buf.getBytesDirect(),offset);
- offset += 4;
- byte[] mbr = new byte[mbrLen];
- System.arraycopy(buf.getBytesDirect(), offset, mbr, 0, mbrLen);
- view[i] = MemberImpl.getMember(mbr);
- offset += mbrLen;
- }
- //id
- this.id = new UniqueId(buf.getBytesDirect(),offset,16);
- offset += 16;
- type = new byte[16];
- System.arraycopy(buf.getBytesDirect(), offset, type, 0, type.length);
- offset += 16;
-
- }
-
- public void write() {
- buf.reset();
- //header
- buf.append(COORD_HEADER,0,COORD_HEADER.length);
- //leader
- byte[] ldr = leader.getData(false,false);
- buf.append(ldr.length);
- buf.append(ldr,0,ldr.length);
- ldr = null;
- //source
- byte[] src = source.getData(false,false);
- buf.append(src.length);
- buf.append(src,0,src.length);
- src = null;
- //view
- buf.append(view.length);
- for (int i=0; i<view.length; i++ ) {
- byte[] mbr = view[i].getData(false,false);
- buf.append(mbr.length);
- buf.append(mbr,0,mbr.length);
- }
- //id
- buf.append(id.getBytes(),0,id.getBytes().length);
- buf.append(type,0,type.length);
- }
- }
-
- public void fireInterceptorEvent(InterceptorEvent event) {
- if (event instanceof CoordinationEvent &&
- ((CoordinationEvent)event).type == CoordinationEvent.EVT_CONF_RX)
- log.info(event);
- }
-
- public static class CoordinationEvent implements InterceptorEvent {
- public static final int EVT_START = 1;
- public static final int EVT_MBR_ADD = 2;
- public static final int EVT_MBR_DEL = 3;
- public static final int EVT_START_ELECT = 4;
- public static final int EVT_PROCESS_ELECT = 5;
- public static final int EVT_MSG_ARRIVE = 6;
- public static final int EVT_PRE_MERGE = 7;
- public static final int EVT_POST_MERGE = 8;
- public static final int EVT_WAIT_FOR_MSG = 9;
- public static final int EVT_SEND_MSG = 10;
- public static final int EVT_STOP = 11;
- public static final int EVT_CONF_RX = 12;
- public static final int EVT_ELECT_ABANDONED = 13;
-
- int type;
- ChannelInterceptor interceptor;
- Member coord;
- Member[] mbrs;
- String info;
- Membership view;
- Membership suggestedView;
- public CoordinationEvent(int type,ChannelInterceptor interceptor, String info) {
- this.type = type;
- this.interceptor = interceptor;
- this.coord = ((NonBlockingCoordinator)interceptor).getCoordinator();
- this.mbrs = ((NonBlockingCoordinator)interceptor).membership.getMembers();
- this.info = info;
- this.view = ((NonBlockingCoordinator)interceptor).view;
- this.suggestedView = ((NonBlockingCoordinator)interceptor).suggestedView;
- }
-
- public int getEventType() {
- return type;
- }
-
- public String getEventTypeDesc() {
- switch (type) {
- case EVT_START: return "EVT_START:"+info;
- case EVT_MBR_ADD: return "EVT_MBR_ADD:"+info;
- case EVT_MBR_DEL: return "EVT_MBR_DEL:"+info;
- case EVT_START_ELECT: return "EVT_START_ELECT:"+info;
- case EVT_PROCESS_ELECT: return "EVT_PROCESS_ELECT:"+info;
- case EVT_MSG_ARRIVE: return "EVT_MSG_ARRIVE:"+info;
- case EVT_PRE_MERGE: return "EVT_PRE_MERGE:"+info;
- case EVT_POST_MERGE: return "EVT_POST_MERGE:"+info;
- case EVT_WAIT_FOR_MSG: return "EVT_WAIT_FOR_MSG:"+info;
- case EVT_SEND_MSG: return "EVT_SEND_MSG:"+info;
- case EVT_STOP: return "EVT_STOP:"+info;
- case EVT_CONF_RX: return "EVT_CONF_RX:"+info;
- case EVT_ELECT_ABANDONED: return "EVT_ELECT_ABANDONED:"+info;
- default: return "Unknown";
- }
- }
-
- public ChannelInterceptor getInterceptor() {
- return interceptor;
- }
-
- public String toString() {
- StringBuffer buf = new StringBuffer("CoordinationEvent[type=");
- buf.append(type).append("\n\tLocal:");
- Member local = interceptor.getLocalMember(false);
- buf.append(local!=null?local.getName():"").append("\n\tCoord:");
- buf.append(coord!=null?coord.getName():"").append("\n\tView:");
- buf.append(Arrays.toNameString(view!=null?view.getMembers():null)).append("\n\tSuggested View:");
- buf.append(Arrays.toNameString(suggestedView!=null?suggestedView.getMembers():null)).append("\n\tMembers:");
- buf.append(Arrays.toNameString(mbrs)).append("\n\tInfo:");
- buf.append(info).append("]");
- return buf.toString();
- }
- }
-
-
-
-
-
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ */
+package org.apache.catalina.tribes.group.interceptors;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelInterceptor;
+import org.apache.catalina.tribes.ChannelInterceptor.InterceptorEvent;
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.UniqueId;
+import org.apache.catalina.tribes.group.AbsoluteOrder;
+import org.apache.catalina.tribes.group.ChannelInterceptorBase;
+import org.apache.catalina.tribes.group.InterceptorPayload;
+import org.apache.catalina.tribes.io.ChannelData;
+import org.apache.catalina.tribes.io.XByteBuffer;
+import org.apache.catalina.tribes.membership.MemberImpl;
+import org.apache.catalina.tribes.membership.Membership;
+import org.apache.catalina.tribes.util.Arrays;
+import org.apache.catalina.tribes.util.UUIDGenerator;
+
+/**
+ * <p>Title: Auto merging leader election algorithm</p>
+ *
+ * <p>Description: Implementation of a simple coordinator algorithm that not only selects a coordinator,
+ * it also merges groups automatically when members are discovered that werent part of the
+ * </p>
+ * <p>This algorithm is non blocking meaning it allows for transactions while the coordination phase is going on
+ * </p>
+ * <p>This implementation is based on a home brewed algorithm that uses the AbsoluteOrder of a membership
+ * to pass a token ring of the current membership.<br>
+ * This is not the same as just using AbsoluteOrder! Consider the following scenario:<br>
+ * Nodes, A,B,C,D,E on a network, in that priority. AbsoluteOrder will only work if all
+ * nodes are receiving pings from all the other nodes.
+ * meaning, that node{i} receives pings from node{all}-node{i}<br>
+ * but the following could happen if a multicast problem occurs.
+ * A has members {B,C,D}<br>
+ * B has members {A,C}<br>
+ * C has members {D,E}<br>
+ * D has members {A,B,C,E}<br>
+ * E has members {A,C,D}<br>
+ * Because the default Tribes membership implementation, relies on the multicast packets to
+ * arrive at all nodes correctly, there is nothing guaranteeing that it will.<br>
+ * <br>
+ * To best explain how this algorithm works, lets take the above example:
+ * For simplicity we assume that a send operation is O(1) for all nodes, although this algorithm will work
+ * where messages overlap, as they all depend on absolute order<br>
+ * Scenario 1: A,B,C,D,E all come online at the same time
+ * Eval phase, A thinks of itself as leader, B thinks of A as leader,
+ * C thinks of itself as leader, D,E think of A as leader<br>
+ * Token phase:<br>
+ * (1) A sends out a message X{A-ldr, A-src, mbrs-A,B,C,D} to B where X is the id for the message(and the view)<br>
+ * (1) C sends out a message Y{C-ldr, C-src, mbrs-C,D,E} to D where Y is the id for the message(and the view)<br>
+ * (2) B receives X{A-ldr, A-src, mbrs-A,B,C,D}, sends X{A-ldr, A-src, mbrs-A,B,C,D} to C <br>
+ * (2) D receives Y{C-ldr, C-src, mbrs-C,D,E} D is aware of A,B, sends Y{A-ldr, C-src, mbrs-A,B,C,D,E} to E<br>
+ * (3) C receives X{A-ldr, A-src, mbrs-A,B,C,D}, sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to D<br>
+ * (3) E receives Y{A-ldr, C-src, mbrs-A,B,C,D,E} sends Y{A-ldr, C-src, mbrs-A,B,C,D,E} to A<br>
+ * (4) D receives X{A-ldr, A-src, mbrs-A,B,C,D,E} sends sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to A<br>
+ * (4) A receives Y{A-ldr, C-src, mbrs-A,B,C,D,E}, holds the message, add E to its list of members<br>
+ * (5) A receives X{A-ldr, A-src, mbrs-A,B,C,D,E} <br>
+ * At this point, the state looks like<br>
+ * A - {A-ldr, mbrs-A,B,C,D,E, id=X}<br>
+ * B - {A-ldr, mbrs-A,B,C,D, id=X}<br>
+ * C - {A-ldr, mbrs-A,B,C,D,E, id=X}<br>
+ * D - {A-ldr, mbrs-A,B,C,D,E, id=X}<br>
+ * E - {A-ldr, mbrs-A,B,C,D,E, id=Y}<br>
+ * <br>
+ * A message doesn't stop until it reaches its original sender, unless its dropped by a higher leader.
+ * As you can see, E still thinks the viewId=Y, which is not correct. But at this point we have
+ * arrived at the same membership and all nodes are informed of each other.<br>
+ * To synchronize the rest we simply perform the following check at A when A receives X:<br>
+ * Original X{A-ldr, A-src, mbrs-A,B,C,D} == Arrived X{A-ldr, A-src, mbrs-A,B,C,D,E}<br>
+ * Since the condition is false, A, will resend the token, and A sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to B
+ * When A receives X again, the token is complete. <br>
+ * Optionally, A can send a message X{A-ldr, A-src, mbrs-A,B,C,D,E confirmed} to A,B,C,D,E who then
+ * install and accept the view.
+ * </p>
+ * <p>
+ * Lets assume that C1 arrives, C1 has lower priority than C, but higher priority than D.<br>
+ * Lets also assume that C1 sees the following view {B,D,E}<br>
+ * C1 waits for a token to arrive. When the token arrives, the same scenario as above will happen.<br>
+ * In the scenario where C1 sees {D,E} and A,B,C can not see C1, no token will ever arrive.<br>
+ * In this case, C1 sends a Z{C1-ldr, C1-src, mbrs-C1,D,E} to D<br>
+ * D receives Z{C1-ldr, C1-src, mbrs-C1,D,E} and sends Z{A-ldr, C1-src, mbrs-A,B,C,C1,D,E} to E<br>
+ * E receives Z{A-ldr, C1-src, mbrs-A,B,C,C1,D,E} and sends it to A<br>
+ * A sends Z{A-ldr, A-src, mbrs-A,B,C,C1,D,E} to B and the chain continues until A receives the token again.
+ * At that time A optionally sends out Z{A-ldr, A-src, mbrs-A,B,C,C1,D,E, confirmed} to A,B,C,C1,D,E
+ * </p>
+ * <p>To ensure that the view gets implemented at all nodes at the same time,
+ * A will send out a VIEW_CONF message, this is the 'confirmed' message that is optional above.
+ * <p>Ideally, the interceptor below this one would be the TcpFailureDetector to ensure correct memberships</p>
+ *
+ * <p>The example above, of course can be simplified with a finite statemachine:<br>
+ * But I suck at writing state machines, my head gets all confused. One day I will document this algorithm though.<br>
+ * Maybe I'll do a state diagram :)
+ * </p>
+ * <h2>State Diagrams</h2>
+ * <a href="http://people.apache.org/~fhanik/tribes/docs/leader-election-initiate-election.jpg">Initiate an election</a><br><br>
+ * <a href="http://people.apache.org/~fhanik/tribes/docs/leader-election-message-arrives.jpg">Receive an election message</a><br><br>
+ *
+ * @author Filip Hanik
+ * @version 1.0
+ *
+ *
+ *
+ */
+public class NonBlockingCoordinator extends ChannelInterceptorBase {
+
+ /**
+ * header for a coordination message
+ */
+ protected static final byte[] COORD_HEADER = new byte[] {-86, 38, -34, -29, -98, 90, 65, 63, -81, -122, -6, -110, 99, -54, 13, 63};
+ /**
+ * Coordination request
+ */
+ protected static final byte[] COORD_REQUEST = new byte[] {104, -95, -92, -42, 114, -36, 71, -19, -79, 20, 122, 101, -1, -48, -49, 30};
+ /**
+ * Coordination confirmation, for blocking installations
+ */
+ protected static final byte[] COORD_CONF = new byte[] {67, 88, 107, -86, 69, 23, 76, -70, -91, -23, -87, -25, -125, 86, 75, 20};
+
+ /**
+ * Alive message
+ */
+ protected static final byte[] COORD_ALIVE = new byte[] {79, -121, -25, -15, -59, 5, 64, 94, -77, 113, -119, -88, 52, 114, -56, -46,
+ -18, 102, 10, 34, -127, -9, 71, 115, -70, 72, -101, 88, 72, -124, 127, 111,
+ 74, 76, -116, 50, 111, 103, 65, 3, -77, 51, -35, 0, 119, 117, 9, -26,
+ 119, 50, -75, -105, -102, 36, 79, 37, -68, -84, -123, 15, -22, -109, 106, -55};
+ /**
+ * Time to wait for coordination timeout
+ */
+ protected long waitForCoordMsgTimeout = 15000;
+ /**
+ * Our current view
+ */
+ protected Membership view = null;
+ /**
+ * Out current viewId
+ */
+ protected UniqueId viewId;
+
+ /**
+ * Our nonblocking membership
+ */
+ protected Membership membership = null;
+
+ /**
+ * indicates that we are running an election
+ * and this is the one we are running
+ */
+ protected UniqueId suggestedviewId;
+ protected Membership suggestedView;
+
+ protected boolean started = false;
+ protected final int startsvc = 0xFFFF;
+
+ protected Object electionMutex = new Object();
+
+ protected AtomicBoolean coordMsgReceived = new AtomicBoolean(false);
+
+ public NonBlockingCoordinator() {
+ super();
+ }
+
+//============================================================================================================
+// COORDINATION HANDLING
+//============================================================================================================
+
+ public void startElection(boolean force) throws ChannelException {
+ synchronized (electionMutex) {
+ MemberImpl local = (MemberImpl)getLocalMember(false);
+ MemberImpl[] others = (MemberImpl[])membership.getMembers();
+ fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START_ELECT,this,"Election initated"));
+ if ( others.length == 0 ) {
+ this.viewId = new UniqueId(UUIDGenerator.randomUUID(false));
+ this.view = new Membership(local,AbsoluteOrder.comp, true);
+ this.handleViewConf(this.createElectionMsg(local,others,local),local,view);
+ return; //the only member, no need for an election
+ }
+ if ( suggestedviewId != null ) {
+
+ if ( view != null && Arrays.diff(view,suggestedView,local).length == 0 && Arrays.diff(suggestedView,view,local).length == 0) {
+ suggestedviewId = null;
+ suggestedView = null;
+ fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, running election matches view"));
+ } else {
+ fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, election running"));
+ }
+ return; //election already running, I'm not allowed to have two of them
+ }
+ if ( view != null && Arrays.diff(view,membership,local).length == 0 && Arrays.diff(membership,view,local).length == 0) {
+ fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, view matches membership"));
+ return; //already have this view installed
+ }
+ int prio = AbsoluteOrder.comp.compare(local,others[0]);
+ MemberImpl leader = ( prio < 0 )?local:others[0];//am I the leader in my view?
+ if ( local.equals(leader) || force ) {
+ CoordinationMessage msg = createElectionMsg(local, others, leader);
+ suggestedviewId = msg.getId();
+ suggestedView = new Membership(local,AbsoluteOrder.comp,true);
+ Arrays.fill(suggestedView,msg.getMembers());
+ fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_PROCESS_ELECT,this,"Election, sending request"));
+ sendElectionMsg(local,others[0],msg);
+ } else {
+ try {
+ coordMsgReceived.set(false);
+ fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_WAIT_FOR_MSG,this,"Election, waiting for request"));
+ electionMutex.wait(waitForCoordMsgTimeout);
+ }catch ( InterruptedException x ) {
+ Thread.currentThread().interrupted();
+ }
+ if ( suggestedviewId == null && (!coordMsgReceived.get())) {
+ //no message arrived, send the coord msg
+// fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_WAIT_FOR_MSG,this,"Election, waiting timed out."));
+// startElection(true);
+ fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, waiting timed out."));
+ } else {
+ fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, received a message"));
+ }
+ }//end if
+
+ }
+ }
+
+ private CoordinationMessage createElectionMsg(MemberImpl local, MemberImpl[] others, MemberImpl leader) {
+ Membership m = new Membership(local,AbsoluteOrder.comp,true);
+ Arrays.fill(m,others);
+ MemberImpl[] mbrs = m.getMembers();
+ m.reset();
+ CoordinationMessage msg = new CoordinationMessage(leader, local, mbrs,new UniqueId(UUIDGenerator.randomUUID(true)), this.COORD_REQUEST);
+ return msg;
+ }
+
+ protected void sendElectionMsg(MemberImpl local, MemberImpl next, CoordinationMessage msg) throws ChannelException {
+ fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_SEND_MSG,this,"Sending election message to("+next.getName()+")"));
+ super.sendMessage(new Member[] {next}, createData(msg, local), null);
+ }
+
+ protected void sendElectionMsgToNextInline(MemberImpl local, CoordinationMessage msg) throws ChannelException {
+ int next = Arrays.nextIndex(local,msg.getMembers());
+ int current = next;
+ msg.leader = msg.getMembers()[0];
+ boolean sent = false;
+ while ( !sent && current >= 0 ) {
+ try {
+ sendElectionMsg(local, (MemberImpl) msg.getMembers()[current], msg);
+ sent = true;
+ }catch ( ChannelException x ) {
+ log.warn("Unable to send election message to:"+msg.getMembers()[current]);
+ current = Arrays.nextIndex(msg.getMembers()[current],msg.getMembers());
+ if ( current == next ) throw x;
+ }
+ }
+ }
+
+ public Member getNextInLine(MemberImpl local, MemberImpl[] others) {
+ MemberImpl result = null;
+ for ( int i=0; i<others.length; i++ ) {
+
+ }
+ return result;
+ }
+
+ public ChannelData createData(CoordinationMessage msg, MemberImpl local) {
+ msg.write();
+ ChannelData data = new ChannelData(true);
+ data.setAddress(local);
+ data.setMessage(msg.getBuffer());
+ data.setOptions(Channel.SEND_OPTIONS_USE_ACK);
+ data.setTimestamp(System.currentTimeMillis());
+ return data;
+ }
+
+ protected void viewChange(UniqueId viewId, Member[] view) {
+ //invoke any listeners
+ }
+
+ protected boolean alive(Member mbr) {
+ return TcpFailureDetector.memberAlive(mbr,
+ COORD_ALIVE,
+ false,
+ false,
+ waitForCoordMsgTimeout,
+ waitForCoordMsgTimeout,
+ getOptionFlag());
+ }
+
+ protected Membership mergeOnArrive(CoordinationMessage msg, Member sender) {
+ fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_PRE_MERGE,this,"Pre merge"));
+ MemberImpl local = (MemberImpl)getLocalMember(false);
+ Membership merged = new Membership(local,AbsoluteOrder.comp,true);
+ Arrays.fill(merged,msg.getMembers());
+ Arrays.fill(merged,getMembers());
+ Member[] diff = Arrays.diff(merged,membership,local);
+ for ( int i=0; i<diff.length; i++ ) {
+ if (!alive(diff[i])) merged.removeMember((MemberImpl)diff[i]);
+ else memberAdded(diff[i],false);
+ }
+ fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_POST_MERGE,this,"Post merge"));
+ return merged;
+ }
+
+ protected void processCoordMessage(CoordinationMessage msg, Member sender) throws ChannelException {
+ if ( !coordMsgReceived.get() ) {
+ coordMsgReceived.set(true);
+ synchronized (electionMutex) { electionMutex.notifyAll();}
+ }
+ msg.timestamp = System.currentTimeMillis();
+ Membership merged = mergeOnArrive(msg, sender);
+ if (isViewConf(msg)) handleViewConf(msg, sender, merged);
+ else handleToken(msg, sender, merged);
+ ClassLoader loader;
+
+ }
+
+ protected void handleToken(CoordinationMessage msg, Member sender,Membership merged) throws ChannelException {
+ MemberImpl local = (MemberImpl)getLocalMember(false);
+ if ( local.equals(msg.getSource()) ) {
+ //my message msg.src=local
+ handleMyToken(local, msg, sender,merged);
+ } else {
+ handleOtherToken(local, msg, sender,merged);
+ }
+ }
+
+ protected void handleMyToken(MemberImpl local, CoordinationMessage msg, Member sender,Membership merged) throws ChannelException {
+ if ( local.equals(msg.getLeader()) ) {
+ //no leadership change
+ if ( Arrays.sameMembers(msg.getMembers(),merged.getMembers()) ) {
+ msg.type = COORD_CONF;
+ super.sendMessage(Arrays.remove(msg.getMembers(),local),createData(msg,local),null);
+ handleViewConf(msg,local,merged);
+ } else {
+ //membership change
+ suggestedView = new Membership(local,AbsoluteOrder.comp,true);
+ suggestedviewId = msg.getId();
+ Arrays.fill(suggestedView,merged.getMembers());
+ msg.view = (MemberImpl[])merged.getMembers();
+ sendElectionMsgToNextInline(local,msg);
+ }
+ } else {
+ //leadership change
+ suggestedView = null;
+ suggestedviewId = null;
+ msg.view = (MemberImpl[])merged.getMembers();
+ sendElectionMsgToNextInline(local,msg);
+ }
+ }
+
+ protected void handleOtherToken(MemberImpl local, CoordinationMessage msg, Member sender,Membership merged) throws ChannelException {
+ if ( local.equals(msg.getLeader()) ) {
+ //I am the new leader
+ //startElection(false);
+ } else {
+ msg.view = (MemberImpl[])merged.getMembers();
+ sendElectionMsgToNextInline(local,msg);
+ }
+ }
+
+ protected void handleViewConf(CoordinationMessage msg, Member sender,Membership merged) throws ChannelException {
+ if ( viewId != null && msg.getId().equals(viewId) ) return;//we already have this view
+ view = new Membership((MemberImpl)getLocalMember(false),AbsoluteOrder.comp,true);
+ Arrays.fill(view,msg.getMembers());
+ viewId = msg.getId();
+
+ if ( viewId.equals(suggestedviewId) ) {
+ suggestedView = null;
+ suggestedviewId = null;
+ }
+
+ if (suggestedView != null && AbsoluteOrder.comp.compare(suggestedView.getMembers()[0],merged.getMembers()[0])<0 ) {
+ suggestedView = null;
+ suggestedviewId = null;
+ }
+
+ viewChange(viewId,view.getMembers());
+ fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_CONF_RX,this,"Accepted View"));
+
+ if ( suggestedviewId == null && hasHigherPriority(merged.getMembers(),membership.getMembers()) ) {
+ startElection(false);
+ }
+ }
+
+ protected boolean isViewConf(CoordinationMessage msg) {
+ return Arrays.contains(msg.getType(),0,COORD_CONF,0,COORD_CONF.length);
+ }
+
+ protected boolean hasHigherPriority(Member[] complete, Member[] local) {
+ if ( local == null || local.length == 0 ) return false;
+ if ( complete == null || complete.length == 0 ) return true;
+ AbsoluteOrder.absoluteOrder(complete);
+ AbsoluteOrder.absoluteOrder(local);
+ return (AbsoluteOrder.comp.compare(complete[0],local[0]) > 0);
+
+ }
+
+
+ /**
+ * Returns coordinator if one is available
+ * @return Member
+ */
+ public Member getCoordinator() {
+ return (view != null && view.hasMembers()) ? view.getMembers()[0] : null;
+ }
+
+ public Member[] getView() {
+ return (view != null && view.hasMembers()) ? view.getMembers() : new Member[0];
+ }
+
+ public UniqueId getViewId() {
+ return viewId;
+ }
+
+ /**
+ * Block in/out messages while a election is going on
+ */
+ protected void halt() {
+
+ }
+
+ /**
+ * Release lock for in/out messages election is completed
+ */
+ protected void release() {
+
+ }
+
+ /**
+ * Wait for an election to end
+ */
+ protected void waitForRelease() {
+
+ }
+
+
+//============================================================================================================
+// OVERRIDDEN METHODS FROM CHANNEL INTERCEPTOR BASE
+//============================================================================================================
+ public void start(int svc) throws ChannelException {
+ if (membership == null) setupMembership();
+ if (started)return;
+ fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START, this, "Before start"));
+ super.start(startsvc);
+ started = true;
+ if (view == null) view = new Membership( (MemberImpl)super.getLocalMember(true), AbsoluteOrder.comp, true);
+ fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START, this, "After start"));
+ startElection(false);
+ }
+
+ public void stop(int svc) throws ChannelException {
+ try {
+ halt();
+ synchronized (electionMutex) {
+ if (!started)return;
+ started = false;
+ fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_STOP, this, "Before stop"));
+ super.stop(startsvc);
+ this.view = null;
+ this.viewId = null;
+ this.suggestedView = null;
+ this.suggestedviewId = null;
+ this.membership.reset();
+ fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_STOP, this, "After stop"));
+ }
+ }finally {
+ release();
+ }
+ }
+
+
+ public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
+ waitForRelease();
+ super.sendMessage(destination, msg, payload);
+ }
+
+ public void messageReceived(ChannelMessage msg) {
+ if ( Arrays.contains(msg.getMessage().getBytesDirect(),0,COORD_ALIVE,0,COORD_ALIVE.length) ) {
+ //ignore message, its an alive message
+ fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MSG_ARRIVE,this,"Alive Message"));
+
+ } else if ( Arrays.contains(msg.getMessage().getBytesDirect(),0,COORD_HEADER,0,COORD_HEADER.length) ) {
+ try {
+ CoordinationMessage cmsg = new CoordinationMessage(msg.getMessage());
+ Member[] cmbr = cmsg.getMembers();
+ fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MSG_ARRIVE,this,"Coord Msg Arrived("+Arrays.toNameString(cmbr)+")"));
+ processCoordMessage(cmsg, msg.getAddress());
+ }catch ( ChannelException x ) {
+ log.error("Error processing coordination message. Could be fatal.",x);
+ }
+ } else {
+ super.messageReceived(msg);
+ }
+ }
+
+ public boolean accept(ChannelMessage msg) {
+ return super.accept(msg);
+ }
+
+ public void memberAdded(Member member) {
+ memberAdded(member,true);
+ }
+
+ public void memberAdded(Member member,boolean elect) {
+ try {
+ if ( membership == null ) setupMembership();
+ if ( membership.memberAlive((MemberImpl)member) ) super.memberAdded(member);
+ try {
+ fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MBR_ADD,this,"Member add("+member.getName()+")"));
+ if (started && elect) startElection(false);
+ }catch ( ChannelException x ) {
+ log.error("Unable to start election when member was added.",x);
+ }
+ }finally {
+ }
+
+ }
+
+ public void memberDisappeared(Member member) {
+ try {
+
+ membership.removeMember((MemberImpl)member);
+ super.memberDisappeared(member);
+ try {
+ fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MBR_DEL,this,"Member remove("+member.getName()+")"));
+ if ( started && (isCoordinator() || isHighest()) )
+ startElection(true); //to do, if a member disappears, only the coordinator can start
+ }catch ( ChannelException x ) {
+ log.error("Unable to start election when member was removed.",x);
+ }
+ }finally {
+ }
+ }
+
+ public boolean isHighest() {
+ Member local = getLocalMember(false);
+ if ( membership.getMembers().length == 0 ) return true;
+ else return AbsoluteOrder.comp.compare(local,membership.getMembers()[0])<=0;
+ }
+
+ public boolean isCoordinator() {
+ Member coord = getCoordinator();
+ return coord != null && getLocalMember(false).equals(coord);
+ }
+
+ public void heartbeat() {
+ try {
+ MemberImpl local = (MemberImpl)getLocalMember(false);
+ if ( view != null && (Arrays.diff(view,membership,local).length != 0 || Arrays.diff(membership,view,local).length != 0) ) {
+ if ( isHighest() ) {
+ fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START_ELECT, this,
+ "Heartbeat found inconsistency, restart election"));
+ startElection(true);
+ }
+ }
+ } catch ( Exception x ){
+ log.error("Unable to perform heartbeat.",x);
+ } finally {
+ super.heartbeat();
+ }
+ }
+
+ /**
+ * has members
+ */
+ public boolean hasMembers() {
+
+ return membership.hasMembers();
+ }
+
+ /**
+ * Get all current cluster members
+ * @return all members or empty array
+ */
+ public Member[] getMembers() {
+
+ return membership.getMembers();
+ }
+
+ /**
+ *
+ * @param mbr Member
+ * @return Member
+ */
+ public Member getMember(Member mbr) {
+
+ return membership.getMember(mbr);
+ }
+
+ /**
+ * Return the member that represents this node.
+ *
+ * @return Member
+ */
+ public Member getLocalMember(boolean incAlive) {
+ Member local = super.getLocalMember(incAlive);
+ if ( view == null && (local != null)) setupMembership();
+ return local;
+ }
+
+ protected synchronized void setupMembership() {
+ if ( membership == null ) {
+ membership = new Membership((MemberImpl)super.getLocalMember(true),AbsoluteOrder.comp,false);
+ }
+ }
+
+
+//============================================================================================================
+// HELPER CLASSES FOR COORDINATION
+//============================================================================================================
+
+
+
+
+ public static class CoordinationMessage {
+ //X{A-ldr, A-src, mbrs-A,B,C,D}
+ protected XByteBuffer buf;
+ protected MemberImpl leader;
+ protected MemberImpl source;
+ protected MemberImpl[] view;
+ protected UniqueId id;
+ protected byte[] type;
+ protected long timestamp = System.currentTimeMillis();
+
+ public CoordinationMessage(XByteBuffer buf) {
+ this.buf = buf;
+ parse();
+ }
+
+ public CoordinationMessage(MemberImpl leader,
+ MemberImpl source,
+ MemberImpl[] view,
+ UniqueId id,
+ byte[] type) {
+ this.buf = new XByteBuffer(4096,false);
+ this.leader = leader;
+ this.source = source;
+ this.view = view;
+ this.id = id;
+ this.type = type;
+ this.write();
+ }
+
+
+ public byte[] getHeader() {
+ return NonBlockingCoordinator.COORD_HEADER;
+ }
+
+ public MemberImpl getLeader() {
+ if ( leader == null ) parse();
+ return leader;
+ }
+
+ public MemberImpl getSource() {
+ if ( source == null ) parse();
+ return source;
+ }
+
+ public UniqueId getId() {
+ if ( id == null ) parse();
+ return id;
+ }
+
+ public MemberImpl[] getMembers() {
+ if ( view == null ) parse();
+ return view;
+ }
+
+ public byte[] getType() {
+ if (type == null ) parse();
+ return type;
+ }
+
+ public XByteBuffer getBuffer() {
+ return this.buf;
+ }
+
+ public void parse() {
+ //header
+ int offset = 16;
+ //leader
+ int ldrLen = buf.toInt(buf.getBytesDirect(),offset);
+ offset += 4;
+ byte[] ldr = new byte[ldrLen];
+ System.arraycopy(buf.getBytesDirect(),offset,ldr,0,ldrLen);
+ leader = MemberImpl.getMember(ldr);
+ offset += ldrLen;
+ //source
+ int srcLen = buf.toInt(buf.getBytesDirect(),offset);
+ offset += 4;
+ byte[] src = new byte[srcLen];
+ System.arraycopy(buf.getBytesDirect(),offset,src,0,srcLen);
+ source = MemberImpl.getMember(src);
+ offset += srcLen;
+ //view
+ int mbrCount = buf.toInt(buf.getBytesDirect(),offset);
+ offset += 4;
+ view = new MemberImpl[mbrCount];
+ for (int i=0; i<view.length; i++ ) {
+ int mbrLen = buf.toInt(buf.getBytesDirect(),offset);
+ offset += 4;
+ byte[] mbr = new byte[mbrLen];
+ System.arraycopy(buf.getBytesDirect(), offset, mbr, 0, mbrLen);
+ view[i] = MemberImpl.getMember(mbr);
+ offset += mbrLen;
+ }
+ //id
+ this.id = new UniqueId(buf.getBytesDirect(),offset,16);
+ offset += 16;
+ type = new byte[16];
+ System.arraycopy(buf.getBytesDirect(), offset, type, 0, type.length);
+ offset += 16;
+
+ }
+
+ public void write() {
+ buf.reset();
+ //header
+ buf.append(COORD_HEADER,0,COORD_HEADER.length);
+ //leader
+ byte[] ldr = leader.getData(false,false);
+ buf.append(ldr.length);
+ buf.append(ldr,0,ldr.length);
+ ldr = null;
+ //source
+ byte[] src = source.getData(false,false);
+ buf.append(src.length);
+ buf.append(src,0,src.length);
+ src = null;
+ //view
+ buf.append(view.length);
+ for (int i=0; i<view.length; i++ ) {
+ byte[] mbr = view[i].getData(false,false);
+ buf.append(mbr.length);
+ buf.append(mbr,0,mbr.length);
+ }
+ //id
+ buf.append(id.getBytes(),0,id.getBytes().length);
+ buf.append(type,0,type.length);
+ }
+ }
+
+ public void fireInterceptorEvent(InterceptorEvent event) {
+ if (event instanceof CoordinationEvent &&
+ ((CoordinationEvent)event).type == CoordinationEvent.EVT_CONF_RX)
+ log.info(event);
+ }
+
+ public static class CoordinationEvent implements InterceptorEvent {
+ public static final int EVT_START = 1;
+ public static final int EVT_MBR_ADD = 2;
+ public static final int EVT_MBR_DEL = 3;
+ public static final int EVT_START_ELECT = 4;
+ public static final int EVT_PROCESS_ELECT = 5;
+ public static final int EVT_MSG_ARRIVE = 6;
+ public static final int EVT_PRE_MERGE = 7;
+ public static final int EVT_POST_MERGE = 8;
+ public static final int EVT_WAIT_FOR_MSG = 9;
+ public static final int EVT_SEND_MSG = 10;
+ public static final int EVT_STOP = 11;
+ public static final int EVT_CONF_RX = 12;
+ public static final int EVT_ELECT_ABANDONED = 13;
+
+ int type;
+ ChannelInterceptor interceptor;
+ Member coord;
+ Member[] mbrs;
+ String info;
+ Membership view;
+ Membership suggestedView;
+ public CoordinationEvent(int type,ChannelInterceptor interceptor, String info) {
+ this.type = type;
+ this.interceptor = interceptor;
+ this.coord = ((NonBlockingCoordinator)interceptor).getCoordinator();
+ this.mbrs = ((NonBlockingCoordinator)interceptor).membership.getMembers();
+ this.info = info;
+ this.view = ((NonBlockingCoordinator)interceptor).view;
+ this.suggestedView = ((NonBlockingCoordinator)interceptor).suggestedView;
+ }
+
+ public int getEventType() {
+ return type;
+ }
+
+ public String getEventTypeDesc() {
+ switch (type) {
+ case EVT_START: return "EVT_START:"+info;
+ case EVT_MBR_ADD: return "EVT_MBR_ADD:"+info;
+ case EVT_MBR_DEL: return "EVT_MBR_DEL:"+info;
+ case EVT_START_ELECT: return "EVT_START_ELECT:"+info;
+ case EVT_PROCESS_ELECT: return "EVT_PROCESS_ELECT:"+info;
+ case EVT_MSG_ARRIVE: return "EVT_MSG_ARRIVE:"+info;
+ case EVT_PRE_MERGE: return "EVT_PRE_MERGE:"+info;
+ case EVT_POST_MERGE: return "EVT_POST_MERGE:"+info;
+ case EVT_WAIT_FOR_MSG: return "EVT_WAIT_FOR_MSG:"+info;
+ case EVT_SEND_MSG: return "EVT_SEND_MSG:"+info;
+ case EVT_STOP: return "EVT_STOP:"+info;
+ case EVT_CONF_RX: return "EVT_CONF_RX:"+info;
+ case EVT_ELECT_ABANDONED: return "EVT_ELECT_ABANDONED:"+info;
+ default: return "Unknown";
+ }
+ }
+
+ public ChannelInterceptor getInterceptor() {
+ return interceptor;
+ }
+
+ public String toString() {
+ StringBuffer buf = new StringBuffer("CoordinationEvent[type=");
+ buf.append(type).append("\n\tLocal:");
+ Member local = interceptor.getLocalMember(false);
+ buf.append(local!=null?local.getName():"").append("\n\tCoord:");
+ buf.append(coord!=null?coord.getName():"").append("\n\tView:");
+ buf.append(Arrays.toNameString(view!=null?view.getMembers():null)).append("\n\tSuggested View:");
+ buf.append(Arrays.toNameString(suggestedView!=null?suggestedView.getMembers():null)).append("\n\tMembers:");
+ buf.append(Arrays.toNameString(mbrs)).append("\n\tInfo:");
+ buf.append(info).append("]");
+ return buf.toString();
+ }
+ }
+
+
+
+
+
}
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org