You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/05/20 00:42:51 UTC

svn commit: r407923 - in /tomcat/container/tc5.5.x/modules/groupcom: ./ src/share/org/apache/catalina/tribes/group/ src/share/org/apache/catalina/tribes/group/interceptors/ src/share/org/apache/catalina/tribes/membership/ test/java/org/apache/catalina/...

Author: fhanik
Date: Fri May 19 15:42:51 2006
New Revision: 407923

URL: http://svn.apache.org/viewvc?rev=407923&view=rev
Log:
Added in absolute order utility.
Added in the complete test for the tcp failure detector

Added:
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/AbsoluteOrder.java
Modified:
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java
    tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/membership/TestTcpFailureDetector.java
    tomcat/container/tc5.5.x/modules/groupcom/to-do.txt

Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/AbsoluteOrder.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/AbsoluteOrder.java?rev=407923&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/AbsoluteOrder.java (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/AbsoluteOrder.java Fri May 19 15:42:51 2006
@@ -0,0 +1,99 @@
+package org.apache.catalina.tribes.group;
+
+import org.apache.catalina.tribes.Member;
+import java.util.Comparator;
+import java.util.Arrays;
+
+/**
+ * <p>Title: Membership - Absolute Order</p>
+ *
+ * <p>Description: A simple, yet agreeable and efficient way of ordering members</p>
+ * <p>
+ *    Ordering members can serve as a basis for electing a leader or coordinating efforts.<br>
+ *    This is stinky simple, it works on the basis of the <code>Member</code> interface
+ *    and orders members in the following format:
+ * 
+ *  <ol>
+ *     <li>IP comparison - byte by byte, lower byte higher rank</li>
+ *     <li>IPv4 addresses rank higher than IPv6, ie the lesser number of bytes, the higher rank</li>
+ *     <li>Port comparison - lower port, higher rank</li>
+ *     <li>UniqueId comparison- byte by byte, lower byte higher rank</li>
+ *  </ol>
+ *     
+ * </p>
+ *
+ * @author Filip Hanik
+ * @version 1.0
+ * @see org.apache.catalina.tribes.Member
+ */
+public class AbsoluteOrder {
+    protected static AbsoluteComparator comp = new AbsoluteComparator();
+    
+    protected AbsoluteOrder() {
+        super();
+    }
+
+    
+    
+    public static void absoluteOrder(Member[] members) {
+        Arrays.sort(members,comp);
+    }
+    
+    
+    public static class AbsoluteComparator implements Comparator {
+        public int compare(Object o1, Object o2) {
+            if ( !((o1 instanceof Member) && (o2 instanceof Member)) ) return 0;
+            return compareMembers((Member)o1,(Member)o2);
+        }
+        
+        public int compareMembers(Member m1, Member m2) {
+            int result = compareIps(m1,m2);
+            if ( result == 0 ) result = comparePorts(m1,m2);
+            if ( result == 0 ) result = compareIds(m1,m2);
+            return result;
+        }
+        
+        public int compareIps(Member m1, Member m2) {
+            return compareBytes(m1.getHost(),m2.getHost());
+        }
+        
+        public int comparePorts(Member m1, Member m2) {
+            return compareInts(m1.getPort(),m2.getPort());
+        }
+        
+        public int compareIds(Member m1, Member m2) {
+            return compareBytes(m1.getUniqueId(),m2.getUniqueId());
+        }
+        
+        protected int compareBytes(byte[] d1, byte[] d2) {
+            int result = 0;
+            if ( d1.length == d2.length ) {
+                for (int i=0; (result==0) && (i<d1.length); i++) {
+                    result = compareBytes(d1[i],d2[i]);
+                }
+            } else if ( d1.length < d2.length) {
+                result = -1;
+            } else {
+                result = 1;
+            }
+            return result;
+        }
+        
+        protected int compareBytes(byte b1, byte b2) {
+            return compareInts((int)b1,(int)b2);
+        }
+        
+        protected int compareInts(int b1, int b2) {
+            int result = 0;
+            if ( b1 == b2 ) {
+
+            } else if ( b1 < b2) {
+                result = -1;
+            } else {
+                result = 1;
+            }
+            return result;
+        }
+    }
+    
+}
\ No newline at end of file

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java?rev=407923&r1=407922&r2=407923&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java Fri May 19 15:42:51 2006
@@ -117,12 +117,12 @@
             removeSuspects.remove(member);
         } else {
             //if we add it here, then add it upwards too
-            if ( membership.memberAlive((MemberImpl)member) ) {
+            if ( membership.getMember((MemberImpl)member) == null) {
                 //check to see if it is alive
                 if (memberAlive(member)) {
+                    membership.memberAlive((MemberImpl)member);
                     super.memberAdded(member);
                 } else {
-                    membership.removeMember((MemberImpl)member);
                     addSuspects.put(member, new Long(System.currentTimeMillis()));
                 }
             }        

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java?rev=407923&r1=407922&r2=407923&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java Fri May 19 15:42:51 2006
@@ -178,6 +178,8 @@
             socket.setTimeToLive(mcastTTL);
         }
     }
+    
+    
 
     /**
      * Start the service

Modified: tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/membership/TestTcpFailureDetector.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/membership/TestTcpFailureDetector.java?rev=407923&r1=407922&r2=407923&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/membership/TestTcpFailureDetector.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/membership/TestTcpFailureDetector.java Fri May 19 15:42:51 2006
@@ -34,8 +34,10 @@
         super.setUp();
         channel1 = new GroupChannel();
         channel2 = new GroupChannel();
-        mbrlist1 = new TestMbrListener();
-        mbrlist2 = new TestMbrListener();
+        channel1.getMembershipService().setPayload("Channel-1".getBytes("ASCII"));
+        channel2.getMembershipService().setPayload("Channel-2".getBytes("ASCII"));
+        mbrlist1 = new TestMbrListener("Channel-1");
+        mbrlist2 = new TestMbrListener("Channel-2");
         tcpFailureDetector1 = new TcpFailureDetector();
         tcpFailureDetector2 = new TcpFailureDetector();
         channel1.addInterceptor(tcpFailureDetector1);
@@ -44,12 +46,19 @@
         channel2.addMembershipListener(mbrlist2);
     }
     
+    public void clear() {
+        mbrlist1.members.clear();
+        mbrlist2.members.clear();
+    }
     
     public void testTcpSendFailureMemberDrop() throws Exception {
+        System.out.println("testTcpSendFailureMemberDrop()");
+        clear();
         channel1.start(channel1.DEFAULT);
         channel2.start(channel2.DEFAULT);
-        channel2.stop(channel2.SND_RX_SEQ);
+        Thread.sleep(1000);
         assertEquals("Expecting member count to be equal",mbrlist1.members.size(),mbrlist2.members.size());
+        channel2.stop(channel2.SND_RX_SEQ);
         ByteMessage msg = new ByteMessage(new byte[1024]);
         try {
             channel1.send(channel1.getMembers(), msg, 0);
@@ -62,9 +71,27 @@
         channel2.stop(Channel.DEFAULT);
     }
     
+    public void testTcpFailureMemberAdd() throws Exception {
+        System.out.println("testTcpFailureMemberAdd()");
+        clear();
+        channel1.start(channel1.DEFAULT);
+        channel2.start(channel2.SND_RX_SEQ);
+        channel2.start(channel2.SND_TX_SEQ);
+        channel2.start(channel2.MBR_RX_SEQ);
+        channel2.stop(channel2.SND_RX_SEQ);
+        channel2.start(channel2.MBR_TX_SEQ);
+        Thread.sleep(1000);
+        assertEquals("Expecting member count to not be equal",mbrlist1.members.size()+1,mbrlist2.members.size());
+        channel1.stop(Channel.DEFAULT);
+        channel2.stop(Channel.DEFAULT);
+    }
+
     public void testTcpMcastFail() throws Exception {
+        System.out.println("testTcpMcastFail()");
+        clear();
         channel1.start(channel1.DEFAULT);
         channel2.start(channel2.DEFAULT);
+        Thread.sleep(1000);
         assertEquals("Expecting member count to be equal",mbrlist1.members.size(),mbrlist2.members.size());
         channel2.stop(channel2.MBR_TX_SEQ);
         ByteMessage msg = new ByteMessage(new byte[1024]);
@@ -91,13 +118,31 @@
     }
     
     public class TestMbrListener implements MembershipListener {
+        public String name = null;
+        public TestMbrListener(String name) {
+            this.name = name;
+        }
         public ArrayList members = new ArrayList();
         public void memberAdded(Member member) {
-            if ( !members.contains(member) ) members.add(member);
+            if ( !members.contains(member) ) {
+                members.add(member);
+                try{
+                    System.out.println(name + ":member added[" + new String(member.getPayload(), "ASCII") + "]");
+                }catch ( Exception x ) {
+                    System.out.println(name + ":member added[unknown]");
+                }
+            }
         }
         
         public void memberDisappeared(Member member) {
-            if ( members.contains(member) ) members.remove(member);
+            if ( members.contains(member) ) {
+                members.remove(member);
+                try{
+                    System.out.println(name + ":member disappeared[" + new String(member.getPayload(), "ASCII") + "]");
+                }catch ( Exception x ) {
+                    System.out.println(name + ":member disappeared[unknown]");
+                }
+            }
         }
         
     }

Modified: tomcat/container/tc5.5.x/modules/groupcom/to-do.txt
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/to-do.txt?rev=407923&r1=407922&r2=407923&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/to-do.txt (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/to-do.txt Fri May 19 15:42:51 2006
@@ -42,6 +42,15 @@
 Code Tasks:
 ===========================================
 
+9. CoordinatorInterceptor - manages the selection of a cluster coordinator
+   just had a brilliant idea, if GroupChannel keeps its own view of members,
+   the coordinator interceptor can hold on to the member added/disappared event
+   It can also intercept down going messages if the coordinator disappeared
+   while a new coordinator is chosen
+   It can also intercept down going messages for members disappeared that the 
+   calling app not yet knows about, to avoid a ChannelException
+   The coordinator is needed because of the mixup when two channels startup instantly
+
 48. Periodic refresh of the replicated map (primary ->backup)
 
 47. Delta(session) versioning. increase version number each time, easier to keep maps in sync
@@ -136,15 +145,6 @@
    (This is useful when synchronized=false and waitForAck=false, to improve
    parallel processing, but you want to have all messages sent in parallel and
    don't return until all have been processed on the remote end.)
-
-9. CoordinatorInterceptor - manages the selection of a cluster coordinator
-   just had a brilliant idea, if GroupChannel keeps its own view of members,
-   the coordinator interceptor can hold on to the member added/disappared event
-   It can also intercept down going messages if the coordinator disappeared
-   while a new coordinator is chosen
-   It can also intercept down going messages for members disappeared that the 
-   calling app not yet knows about, to avoid a ChannelException
-
 
 10. Xa2PhaseCommitInterceptor - make sure the message doesn't reach the receiver unless all members got it
 



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org