You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/05/24 00:11:05 UTC
svn commit: r409013 -
/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java
Author: fhanik
Date: Tue May 23 15:11:05 2006
New Revision: 409013
URL: http://svn.apache.org/viewvc?rev=409013&view=rev
Log:
piece by piece
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java?rev=409013&r1=409012&r2=409013&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java Tue May 23 15:11:05 2006
@@ -14,28 +14,29 @@
*/
package org.apache.catalina.tribes.group.interceptors;
-import org.apache.catalina.tribes.group.ChannelInterceptorBase;
-import org.apache.catalina.tribes.group.InterceptorPayload;
-import org.apache.catalina.tribes.ChannelMessage;
+import java.util.LinkedHashMap;
+
+import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelMessage;
import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.membership.Membership;
-import org.apache.catalina.tribes.membership.MemberImpl;
-import org.apache.catalina.tribes.io.XByteBuffer;
import org.apache.catalina.tribes.UniqueId;
-import org.apache.catalina.tribes.util.UUIDGenerator;
import org.apache.catalina.tribes.group.AbsoluteOrder;
-import org.apache.catalina.tribes.util.Arrays;
+import org.apache.catalina.tribes.group.ChannelInterceptorBase;
+import org.apache.catalina.tribes.group.InterceptorPayload;
import org.apache.catalina.tribes.io.ChannelData;
-import org.apache.catalina.tribes.Channel;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import org.apache.catalina.tribes.membership.*;
+import org.apache.catalina.tribes.io.XByteBuffer;
+import org.apache.catalina.tribes.membership.MemberImpl;
+import org.apache.catalina.tribes.membership.Membership;
+import org.apache.catalina.tribes.util.Arrays;
+import org.apache.catalina.tribes.util.UUIDGenerator;
/**
- * <p>Title: NonBlockingCoordinator</p>
+ * <p>Title: Auto merging leader election algorithm</p>
*
- * <p>Description: Implementation of a simple coordinator algorithm.</p>
+ * <p>Description: Implementation of a simple coordinator algorithm that not only selects a coordinator,
+ * it also merges groups automatically when members are discovered that werent part of the
+ * </p>
* <p>This algorithm is non blocking meaning it allows for transactions while the coordination phase is going on
* </p>
* <p>This implementation is based on a home brewed algorithm that uses the AbsoluteOrder of a membership
@@ -246,7 +247,6 @@
}
protected Membership mergeOnArrive(CoordinationMessage msg, Member sender) {
-
MemberImpl local = (MemberImpl)getLocalMember(false);
Membership merged = new Membership(local,AbsoluteOrder.comp);
Arrays.fill(merged,msg.getMembers());
@@ -254,21 +254,26 @@
Member[] diff = Arrays.diff(merged,membership,local);
for ( int i=0; i<diff.length; i++ ) {
if (!alive(diff[i])) merged.removeMember((MemberImpl)diff[i]);
+ else memberAdded(diff[i],false);
}
return merged;
}
- protected void processCoordMessage(CoordinationMessage msg, Member sender) {
+ protected void processCoordMessage(CoordinationMessage msg, Member sender) throws ChannelException {
synchronized (electionMutex) {
msg.timestamp = System.currentTimeMillis();
rotatingViews.put(msg.getId(),msg);
Membership merged = mergeOnArrive(msg,sender);
if ( isViewConf(msg) ) handleViewConf(msg, sender,merged);
+ else handleToken(msg,sender,merged);
}
}
- protected void handleViewConf(CoordinationMessage msg, Member sender,
- Membership merged) {
+ protected void handleToken(CoordinationMessage msg, Member sender,Membership merged) throws ChannelException {
+
+ }
+
+ protected void handleViewConf(CoordinationMessage msg, Member sender,Membership merged) throws ChannelException {
this.view = new Membership((MemberImpl)getLocalMember(false),AbsoluteOrder.comp);
this.viewId = msg.getId();
if ( viewId.equals(this.suggestedviewId) ) {
@@ -276,12 +281,24 @@
this.suggestedviewId = null;
}
this.viewChange(viewId,view.getMembers());
+ if ( suggestedviewId == null && hasHigherPriority(merged.getMembers(),membership.getMembers()) ) {
+ startElection(false);
+ }
}
protected boolean isViewConf(CoordinationMessage msg) {
return Arrays.contains(msg.getType(),0,COORD_CONF,0,COORD_CONF.length);
}
+
+ protected boolean hasHigherPriority(Member[] complete, Member[] local) {
+ if ( local == null || local.length == 0 ) return false;
+ if ( complete == null || complete.length == 0 ) return true;
+ AbsoluteOrder.absoluteOrder(complete);
+ AbsoluteOrder.absoluteOrder(local);
+ return (AbsoluteOrder.comp.compare(complete[0],local[0]) > 0);
+
+ }
/**
@@ -346,7 +363,11 @@
if ( Arrays.contains(msg.getMessage().getBytesDirect(),0,COORD_ALIVE,0,COORD_ALIVE.length) ) {
//ignore message, its an alive message
} else if ( Arrays.contains(msg.getMessage().getBytesDirect(),0,COORD_HEADER,0,COORD_HEADER.length) ) {
- processCoordMessage(new CoordinationMessage(msg.getMessage()),msg.getAddress());
+ try {
+ processCoordMessage(new CoordinationMessage(msg.getMessage()), msg.getAddress());
+ }catch ( ChannelException x ) {
+ log.error("Error processing coordination message. Could be fatal.",x);
+ }
} else {
super.messageReceived(msg);
}
@@ -357,11 +378,15 @@
}
public void memberAdded(Member member) {
+ memberAdded(member,true);
+ }
+
+ public void memberAdded(Member member,boolean elect) {
try {
if ( membership.memberAlive((MemberImpl)member) ) super.memberAdded(member);
try {
- if (started) startElection(false);
+ if (started && elect) startElection(false);
}catch ( ChannelException x ) {
log.error("Unable to start election when member was added.",x);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org