You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/05/23 22:23:51 UTC
svn commit: r409000 - in /tomcat/container/tc5.5.x/modules/groupcom: doc/
src/share/org/apache/catalina/tribes/group/interceptors/
src/share/org/apache/catalina/tribes/util/
Author: fhanik
Date: Tue May 23 13:23:49 2006
New Revision: 409000
URL: http://svn.apache.org/viewvc?rev=409000&view=rev
Log:
Slowly implementing to match the state diagram
Modified:
tomcat/container/tc5.5.x/modules/groupcom/doc/leader-election-message-arrives.dia
tomcat/container/tc5.5.x/modules/groupcom/doc/leader-election-message-arrives.jpg
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java
Modified: tomcat/container/tc5.5.x/modules/groupcom/doc/leader-election-message-arrives.dia
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/doc/leader-election-message-arrives.dia?rev=409000&r1=408999&r2=409000&view=diff
==============================================================================
Binary files - no diff available.
Modified: tomcat/container/tc5.5.x/modules/groupcom/doc/leader-election-message-arrives.jpg
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/doc/leader-election-message-arrives.jpg?rev=409000&r1=408999&r2=409000&view=diff
==============================================================================
Binary files - no diff available.
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java?rev=409000&r1=408999&r2=409000&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java Tue May 23 13:23:49 2006
@@ -30,6 +30,7 @@
import org.apache.catalina.tribes.Channel;
import java.util.HashMap;
import java.util.LinkedHashMap;
+import org.apache.catalina.tribes.membership.*;
/**
* <p>Title: NonBlockingCoordinator</p>
@@ -132,6 +133,17 @@
protected static final byte[] COORD_CONF = new byte[] {67, 88, 107, -86, 69, 23, 76, -70, -91, -23, -87, -25, -125, 86, 75, 20};
/**
+ * Alive message
+ */
+ protected static final byte[] COORD_ALIVE = new byte[] {79, -121, -25, -15, -59, 5, 64, 94, -77, 113, -119, -88, 52, 114, -56, -46,
+ -18, 102, 10, 34, -127, -9, 71, 115, -70, 72, -101, 88, 72, -124, 127, 111,
+ 74, 76, -116, 50, 111, 103, 65, 3, -77, 51, -35, 0, 119, 117, 9, -26,
+ 119, 50, -75, -105, -102, 36, 79, 37, -68, -84, -123, 15, -22, -109, 106, -55};
+ /**
+ * Time to wait for coordination timeout
+ */
+ protected long waitForCoordMsgTimeout = 5000;
+ /**
* Our current view
*/
protected Membership view = null;
@@ -163,17 +175,11 @@
}
public void start(int svc) throws ChannelException {
- try {
- halt();
- if ( started ) return;
- super.start(startsvc);
- started = true;
-
- }finally {
- release();
- }
- //coordination can happen before this line of code executes
- Member local = getLocalMember(false);
+ if ( membership == null ) setupMembership();
+ if (started)return;
+ super.start(startsvc);
+ startElection(false);
+ started = true;
}
public void stop(int svc) throws ChannelException {
@@ -191,78 +197,82 @@
return (Membership)rotatingViews.get(id);
}
- public void elect() {
+ public void startElection(boolean force) throws ChannelException {
synchronized (electionMutex) {
- try {
- Member[] mbrs = super.getMembers();
- //no members, exit
- if ( mbrs.length == 0 ) return;
- AbsoluteOrder.absoluteOrder(mbrs);
- MemberImpl local = (MemberImpl)getLocalMember(false);
- //I'm not the higest, exit
- if ( !local.equals(mbrs[0]) ) return;
- //I'm already running an election
- if ( suggestedviewId != null ) return;
- //create a suggestedview
- suggestedviewId = new UniqueId(UUIDGenerator.randomUUID(true));
- Membership suggestedview = new Membership((MemberImpl)local,AbsoluteOrder.comp);
- rotatingViews.put(suggestedviewId,suggestedview);
- suggestedview.addMember((MemberImpl)local);
- Arrays.fill(suggestedview,mbrs);
- suggestedviewId = new UniqueId(UUIDGenerator.randomUUID(true));
- CoordinationMessage msg = new CoordinationMessage(local,local,suggestedview.getMembers(),suggestedviewId,COORD_REQUEST);
- for (int i=0; i<mbrs.length; i++ ) {
- try {
- sendMessage(msg,mbrs[i]);
- break;
- } catch ( ChannelException x ) {
- log.error("Unable to send election message, trying next node.",x);
- }
+ if ( suggestedviewId != null ) return;//election already running, I'm not allowed to have two of them
+ MemberImpl local = (MemberImpl)getLocalMember(false);
+ MemberImpl[] others = (MemberImpl[])membership.getMembers();
+ if ( others.length == 0 ) return; //the only member, no need for an election
+ int prio = AbsoluteOrder.comp.compare(local,others[0]);
+ MemberImpl leader = ( prio < 0 )?local:others[0];
+ if ( local.equals(leader) || force ) sendElectionMsg(local,leader,others);
+ else {
+ try {
+ electionMutex.wait(waitForCoordMsgTimeout);
+ }catch ( InterruptedException x ) {
+ Thread.currentThread().interrupted();
}
- halt();
- } finally {
- //dont release, election running
- //release will happen on processCoordMessage
- }
+ if ( rotatingViews.size() == 0 ) {
+ //no message arrived, send the coord msg
+ startElection(true);
+ }
+ }//end if
+ }
+ }
+
+ protected void sendElectionMsg(MemberImpl local, MemberImpl leader, MemberImpl[] others) throws ChannelException {
+ synchronized (electionMutex) {
+ if ( suggestedviewId != null ) return;//election already running, I'm not allowed to have two of them
+ Membership m = new Membership(local,AbsoluteOrder.comp);
+ m.addMember(local);
+ Arrays.fill(m,others);
+ MemberImpl[] mbrs = m.getMembers();
+ CoordinationMessage msg = new CoordinationMessage(leader, local, mbrs,new UniqueId(UUIDGenerator.randomUUID(true)), this.COORD_REQUEST);
+ suggestedviewId = msg.getId();
+ rotatingViews.put(suggestedviewId, msg);
+ super.sendMessage(new Member[] {others[0]}, createData(msg, local), null);
}
}
+ public ChannelData createData(CoordinationMessage msg, MemberImpl local) {
+ ChannelData data = new ChannelData(true);
+ data.setAddress(local);
+ data.setMessage(msg.getBuffer());
+ data.setOptions(Channel.SEND_OPTIONS_USE_ACK);
+ data.setTimestamp(System.currentTimeMillis());
+ return data;
+ }
+
protected void viewChange(UniqueId viewId, Member[] view) {
//invoke any listeners
}
+ protected boolean alive(Member mbr) {
+ return TcpFailureDetector.memberAlive(mbr,
+ COORD_ALIVE,
+ false,
+ false,
+ waitForCoordMsgTimeout,
+ waitForCoordMsgTimeout,
+ getOptionFlag());
+ }
+
+ protected Membership mergeOnArrive(CoordinationMessage msg, Member sender) {
+
+ MemberImpl local = (MemberImpl)getLocalMember(false);
+ Membership merged = new Membership(local,AbsoluteOrder.comp);
+ Arrays.fill(merged,msg.getMembers());
+ Arrays.fill(merged,getMembers());
+ Member[] diff = Arrays.diff(merged,membership,local);
+ for ( int i=0; i<diff.length; i++ ) {
+ if (!alive(diff[i])) merged.removeMember((MemberImpl)diff[i]);
+ }
+ return merged;
+ }
+
protected void processCoordMessage(CoordinationMessage msg, Member sender) {
synchronized (electionMutex) {
- MemberImpl local = (MemberImpl) getLocalMember(false);
- if (suggestedviewId != null) {
- //we are running our own election
- if (suggestedviewId.equals(msg.getId())) {
- //we received our own token
- Membership suggestedview = getView(msg.getId());
- Member[] suggested = suggestedview.getMembers();
- Member[] received = msg.getMembers();
- if (Arrays.sameMembers(suggested,received) ) {
- //we completed the loop
- view = suggestedview;
- viewId = suggestedviewId;
- suggestedviewId = null;
- rotatingViews.remove(viewId);
- suggestedview.reset();
- viewChange(viewId,view.getMembers());
- release();
- } else {
- //view or leadership changed
- if ( !local.equals(msg.getLeader()) ) {
- //leadership changed
- } else {
- //leadership didn't change
- //circulate it again
- }
- }
- }
- } else {
-
- }
+ msg.timestamp = System.currentTimeMillis();
}
}
@@ -289,7 +299,9 @@
}
public void messageReceived(ChannelMessage msg) {
- if ( Arrays.contains(msg.getMessage().getBytesDirect(),0,COORD_HEADER,0,COORD_HEADER.length) ) {
+ if ( Arrays.contains(msg.getMessage().getBytesDirect(),0,COORD_ALIVE,0,COORD_ALIVE.length) ) {
+ //ignore message, its an alive message
+ } else if ( Arrays.contains(msg.getMessage().getBytesDirect(),0,COORD_HEADER,0,COORD_HEADER.length) ) {
processCoordMessage(new CoordinationMessage(msg.getMessage()),msg.getAddress());
} else {
super.messageReceived(msg);
@@ -302,23 +314,31 @@
public void memberAdded(Member member) {
try {
- if ( membership == null ) setupMembership();
+
if ( membership.memberAlive((MemberImpl)member) ) super.memberAdded(member);
- halt();
+ try {
+ if (started) startElection(false);
+ }catch ( ChannelException x ) {
+ log.error("Unable to start election when member was added.",x);
+ }
}finally {
- release();
}
}
public void memberDisappeared(Member member) {
try {
- halt();
- if ( started ) elect();
+
+ membership.removeMember((MemberImpl)member);
+ super.memberDisappeared(member);
+ try {
+ if (started) startElection(false);
+ }catch ( ChannelException x ) {
+ log.error("Unable to start election when member was removed.",x);
+ }
}finally {
- release();
}
- super.memberDisappeared(member);
+
}
public void heartbeat() {
@@ -329,7 +349,7 @@
* has members
*/
public boolean hasMembers() {
- if ( membership == null ) setupMembership();
+
return membership.hasMembers();
}
@@ -338,8 +358,8 @@
* @return all members or empty array
*/
public Member[] getMembers() {
- if ( membership == null ) setupMembership();
- throw new UnsupportedOperationException("Not yet implemented");
+
+ return membership.getMembers();
}
/**
@@ -348,8 +368,8 @@
* @return Member
*/
public Member getMember(Member mbr) {
- if ( membership == null ) setupMembership();
- throw new UnsupportedOperationException("Not yet implemented");
+
+ return membership.getMember(mbr);
}
/**
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java?rev=409000&r1=408999&r2=409000&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java Tue May 23 13:23:49 2006
@@ -58,7 +58,7 @@
private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog( TcpFailureDetector.class );
- protected static byte[] testMessage = new byte[] {
+ protected static byte[] TCP_FAIL_DETECT = new byte[] {
79, -89, 115, 72, 121, -126, 67, -55, -97, 111, -119, -128, -95, 91, 7, 20,
125, -39, 82, 91, -21, -15, 67, -102, -73, 126, -66, -113, -127, 103, 30, -74,
55, 21, -66, -121, 69, 126, 76, -88, -65, 10, 77, 19, 83, 56, 21, 50,
@@ -100,8 +100,8 @@
boolean process = true;
if ( okToProcess(msg.getOptions()) ) {
//check to see if it is a testMessage, if so, process = false
- process = ( (msg.getMessage().getLength() != testMessage.length) ||
- (!Arrays.equals(testMessage,msg.getMessage().getBytes()) ) );
+ process = ( (msg.getMessage().getLength() != TCP_FAIL_DETECT.length) ||
+ (!Arrays.equals(TCP_FAIL_DETECT,msg.getMessage().getBytes()) ) );
}//end if
//ignore the message, it doesnt have the flag set
@@ -218,8 +218,14 @@
}
-
protected boolean memberAlive(Member mbr) {
+ return memberAlive(mbr,TCP_FAIL_DETECT,performSendTest,performReadTest,readTestTimeout,connectTimeout,getOptionFlag());
+ }
+
+ protected static boolean memberAlive(Member mbr, byte[] msgData,
+ boolean sendTest, boolean readTest,
+ long readTimeout, long conTimeout,
+ int optionFlag) {
//could be a shutdown notification
if ( Arrays.equals(mbr.getPayload(),Member.SHUTDOWN_PAYLOAD) ) return false;
@@ -227,20 +233,20 @@
try {
InetAddress ia = InetAddress.getByAddress(mbr.getHost());
InetSocketAddress addr = new InetSocketAddress(ia, mbr.getPort());
- socket.setSoTimeout((int)readTestTimeout);
- socket.connect(addr, (int) connectTimeout);
- if ( performSendTest ) {
+ socket.setSoTimeout((int)readTimeout);
+ socket.connect(addr, (int) conTimeout);
+ if ( sendTest ) {
ChannelData data = new ChannelData(true);
data.setAddress(mbr);
- data.setMessage(new XByteBuffer(testMessage,false));
+ data.setMessage(new XByteBuffer(msgData,false));
data.setTimestamp(System.currentTimeMillis());
- int options = getOptionFlag() | Channel.SEND_OPTIONS_BYTE_MESSAGE;
- if ( performReadTest ) options = (options | Channel.SEND_OPTIONS_USE_ACK);
+ int options = optionFlag | Channel.SEND_OPTIONS_BYTE_MESSAGE;
+ if ( readTest ) options = (options | Channel.SEND_OPTIONS_USE_ACK);
else options = (options & (~Channel.SEND_OPTIONS_USE_ACK));
data.setOptions(options);
byte[] message = XByteBuffer.createDataPackage(data);
socket.getOutputStream().write(message);
- if ( performReadTest ) {
+ if ( readTest ) {
int length = socket.getInputStream().read(message);
return length > 0;
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java?rev=409000&r1=408999&r2=409000&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java Tue May 23 13:23:49 2006
@@ -101,6 +101,17 @@
for (int i=0; i<m.length; i++ ) mbrship.addMember((MemberImpl)m[i]);
}
+ public static Member[] diff(Membership complete, Membership local, MemberImpl ignore) {
+ ArrayList result = new ArrayList();
+ MemberImpl[] comp = complete.getMembers();
+ for ( int i=0; i<comp.length; i++ ) {
+ if ( ignore!=null && ignore.equals(comp[i]) ) continue;
+ if ( local.getMember(comp[i]) == null ) result.add(comp[i]);
+ }
+ return (MemberImpl[])result.toArray(new MemberImpl[result.size()]);
+ }
+
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org