You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/05/24 00:11:05 UTC

svn commit: r409013 - /tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java

Author: fhanik
Date: Tue May 23 15:11:05 2006
New Revision: 409013

URL: http://svn.apache.org/viewvc?rev=409013&view=rev
Log:
piece by piece

Modified:
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java?rev=409013&r1=409012&r2=409013&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java Tue May 23 15:11:05 2006
@@ -14,28 +14,29 @@
  */
 package org.apache.catalina.tribes.group.interceptors;
 
-import org.apache.catalina.tribes.group.ChannelInterceptorBase;
-import org.apache.catalina.tribes.group.InterceptorPayload;
-import org.apache.catalina.tribes.ChannelMessage;
+import java.util.LinkedHashMap;
+
+import org.apache.catalina.tribes.Channel;
 import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelMessage;
 import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.membership.Membership;
-import org.apache.catalina.tribes.membership.MemberImpl;
-import org.apache.catalina.tribes.io.XByteBuffer;
 import org.apache.catalina.tribes.UniqueId;
-import org.apache.catalina.tribes.util.UUIDGenerator;
 import org.apache.catalina.tribes.group.AbsoluteOrder;
-import org.apache.catalina.tribes.util.Arrays;
+import org.apache.catalina.tribes.group.ChannelInterceptorBase;
+import org.apache.catalina.tribes.group.InterceptorPayload;
 import org.apache.catalina.tribes.io.ChannelData;
-import org.apache.catalina.tribes.Channel;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import org.apache.catalina.tribes.membership.*;
+import org.apache.catalina.tribes.io.XByteBuffer;
+import org.apache.catalina.tribes.membership.MemberImpl;
+import org.apache.catalina.tribes.membership.Membership;
+import org.apache.catalina.tribes.util.Arrays;
+import org.apache.catalina.tribes.util.UUIDGenerator;
 
 /**
- * <p>Title: NonBlockingCoordinator</p>
+ * <p>Title: Auto merging leader election algorithm</p>
  *
- * <p>Description: Implementation of a simple coordinator algorithm.</p>
+ * <p>Description: Implementation of a simple coordinator algorithm that not only selects a coordinator,
+ *    it also merges groups automatically when members are discovered that werent part of the 
+ *    </p>
  * <p>This algorithm is non blocking meaning it allows for transactions while the coordination phase is going on
  * </p>
  * <p>This implementation is based on a home brewed algorithm that uses the AbsoluteOrder of a membership
@@ -246,7 +247,6 @@
     }
     
     protected Membership mergeOnArrive(CoordinationMessage msg, Member sender) {
-        
         MemberImpl local = (MemberImpl)getLocalMember(false);
         Membership merged = new Membership(local,AbsoluteOrder.comp);
         Arrays.fill(merged,msg.getMembers());
@@ -254,21 +254,26 @@
         Member[] diff = Arrays.diff(merged,membership,local);
         for ( int i=0; i<diff.length; i++ ) {
             if (!alive(diff[i])) merged.removeMember((MemberImpl)diff[i]);
+            else memberAdded(diff[i],false);
         }
         return merged;
     }
     
-    protected void processCoordMessage(CoordinationMessage msg, Member sender) {
+    protected void processCoordMessage(CoordinationMessage msg, Member sender) throws ChannelException {
         synchronized (electionMutex) {
             msg.timestamp = System.currentTimeMillis();
             rotatingViews.put(msg.getId(),msg);
             Membership merged = mergeOnArrive(msg,sender);
             if ( isViewConf(msg) ) handleViewConf(msg, sender,merged);
+            else handleToken(msg,sender,merged);
         }
     }
     
-    protected void handleViewConf(CoordinationMessage msg, Member sender,
-                                  Membership merged) {
+    protected void handleToken(CoordinationMessage msg, Member sender,Membership merged) throws ChannelException {
+        
+    }
+    
+    protected void handleViewConf(CoordinationMessage msg, Member sender,Membership merged) throws ChannelException {
         this.view = new Membership((MemberImpl)getLocalMember(false),AbsoluteOrder.comp);
         this.viewId = msg.getId();
         if ( viewId.equals(this.suggestedviewId) ) {
@@ -276,12 +281,24 @@
             this.suggestedviewId = null;
         }
         this.viewChange(viewId,view.getMembers());
+        if ( suggestedviewId == null && hasHigherPriority(merged.getMembers(),membership.getMembers()) ) {
+            startElection(false);
+        }
         
     }
     
     protected boolean isViewConf(CoordinationMessage msg) {
         return Arrays.contains(msg.getType(),0,COORD_CONF,0,COORD_CONF.length);
     }
+    
+    protected boolean hasHigherPriority(Member[] complete, Member[] local) {
+        if ( local == null || local.length == 0 ) return false;
+        if ( complete == null || complete.length == 0 ) return true;
+        AbsoluteOrder.absoluteOrder(complete);
+        AbsoluteOrder.absoluteOrder(local);
+        return (AbsoluteOrder.comp.compare(complete[0],local[0]) > 0);
+        
+    }
 
     
     /**
@@ -346,7 +363,11 @@
         if ( Arrays.contains(msg.getMessage().getBytesDirect(),0,COORD_ALIVE,0,COORD_ALIVE.length) ) {
             //ignore message, its an alive message
         } else if ( Arrays.contains(msg.getMessage().getBytesDirect(),0,COORD_HEADER,0,COORD_HEADER.length) ) {
-            processCoordMessage(new CoordinationMessage(msg.getMessage()),msg.getAddress());
+            try {
+                processCoordMessage(new CoordinationMessage(msg.getMessage()), msg.getAddress());
+            }catch ( ChannelException x ) {
+                log.error("Error processing coordination message. Could be fatal.",x);
+            }
         } else {
             super.messageReceived(msg);
         }
@@ -357,11 +378,15 @@
     }
 
     public void memberAdded(Member member) {
+        memberAdded(member,true);
+    }
+
+    public void memberAdded(Member member,boolean elect) {
         try {
             
             if ( membership.memberAlive((MemberImpl)member) ) super.memberAdded(member);
             try {
-                if (started) startElection(false);
+                if (started && elect) startElection(false);
             }catch ( ChannelException x ) {
                 log.error("Unable to start election when member was added.",x);
             }



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org