You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/06/14 19:47:01 UTC

svn commit: r414323 - in /tomcat/container/tc5.5.x/modules/groupcom: src/share/org/apache/catalina/tribes/group/interceptors/ src/share/org/apache/catalina/tribes/transport/ src/share/org/apache/catalina/tribes/transport/bio/ src/share/org/apache/catal...

Author: fhanik
Date: Wed Jun 14 10:47:00 2006
New Revision: 414323

URL: http://svn.apache.org/viewvc?rev=414323&view=rev
Log:
Further enhancements, looking pretty good

Modified:
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReceiverBase.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReceiver.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java
    tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/CoordinationDemo.java

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java?rev=414323&r1=414322&r2=414323&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java Wed Jun 14 10:47:00 2006
@@ -192,7 +192,14 @@
                 return; //the only member, no need for an election
             }
             if ( suggestedviewId != null ) {
-                fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, election running"));
+                
+                if ( view != null && Arrays.diff(view,suggestedView,local).length == 0 &&  Arrays.diff(suggestedView,view,local).length == 0) {
+                    suggestedviewId = null;
+                    suggestedView = null;
+                    fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, running election matches view"));
+                } else {
+                    fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, election running"));
+                }
                 return; //election already running, I'm not allowed to have two of them
             }
             if ( view != null && Arrays.diff(view,membership,local).length == 0 &&  Arrays.diff(membership,view,local).length == 0) {
@@ -218,10 +225,11 @@
                 }
                 if ( suggestedviewId == null && (!coordMsgReceived.get())) {
                     //no message arrived, send the coord msg
-                    fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_WAIT_FOR_MSG,this,"Election, waiting timed out."));
-                    startElection(true);
+//                    fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_WAIT_FOR_MSG,this,"Election, waiting timed out."));
+//                    startElection(true);
+                    fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, waiting timed out."));
                 } else {
-                    fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned"));
+                    fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, received a message"));
                 }
             }//end if
             
@@ -307,7 +315,6 @@
     }
     
     protected void processCoordMessage(CoordinationMessage msg, Member sender) throws ChannelException {
-//        synchronized (electionMutex) {
             coordMsgReceived.set(true);
             msg.timestamp = System.currentTimeMillis();
             Membership merged = mergeOnArrive(msg,sender);
@@ -440,7 +447,6 @@
 //              OVERRIDDEN METHODS FROM CHANNEL INTERCEPTOR BASE    
 //============================================================================================================
     public void start(int svc) throws ChannelException {
-//        synchronized (electionMutex) {
             if (membership == null) setupMembership();
             if (started)return;
             fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START, this, "Before start"));
@@ -449,7 +455,6 @@
             if (view == null) view = new Membership( (MemberImpl)super.getLocalMember(true), AbsoluteOrder.comp, true);
             fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START, this, "After start"));
             startElection(false);
-//        }
     }
 
     public void stop(int svc) throws ChannelException {
@@ -527,9 +532,8 @@
             super.memberDisappeared(member);
             try {
                 fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MBR_DEL,this,"Member remove("+member.getName()+")"));
-                if ( started && (isCoordinator() || member.equals(getCoordinator())) )
-                    startElection(false);
-                //to do, if a member disappears, only the coordinator can start
+                if ( started && (isCoordinator() || isHighest()) ) 
+                    startElection(true); //to do, if a member disappears, only the coordinator can start
             }catch ( ChannelException x ) {
                 log.error("Unable to start election when member was removed.",x);
             }
@@ -537,13 +541,32 @@
         }
     }
     
+    public boolean isHighest() {
+        Member local = getLocalMember(false);
+        if ( membership.getMembers().length == 0 ) return true;
+        else return AbsoluteOrder.comp.compare(local,membership.getMembers()[0])<=0;
+    }
+    
     public boolean isCoordinator() {
         Member coord = getCoordinator();
         return coord != null && getLocalMember(false).equals(coord);
     }
 
     public void heartbeat() {
-        super.heartbeat();
+        try {
+            MemberImpl local = (MemberImpl)getLocalMember(false);
+            if ( view != null && (Arrays.diff(view,membership,local).length != 0 ||  Arrays.diff(membership,view,local).length != 0) ) {
+                if ( isHighest() ) {
+                    fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START_ELECT, this,
+                                                               "Heartbeat found inconsistency, restart election"));
+                    startElection(true);
+                }            
+            }
+        } catch ( Exception x  ){
+            log.error("Unable to perform heartbeat.",x);
+        } finally {
+            super.heartbeat();
+        }
     }
 
     /**

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java?rev=414323&r1=414322&r2=414323&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java Wed Jun 14 10:47:00 2006
@@ -111,39 +111,45 @@
     }//messageReceived
     
     
-    public synchronized void memberAdded(Member member) {
+    public void memberAdded(Member member) {
         if ( membership == null ) setupMembership();
-        if ( removeSuspects.containsKey(member) ) {
-            //previously marked suspect, system below picked up the member again
-            removeSuspects.remove(member);
-        } else {
-            //if we add it here, then add it upwards too
-            if ( membership.getMember((MemberImpl)member) == null) {
+        boolean notify = false;
+        synchronized (membership) {
+            if (removeSuspects.containsKey(member)) {
+                //previously marked suspect, system below picked up the member again
+                removeSuspects.remove(member);
+            } else if (membership.getMember( (MemberImpl) member) == null){
+                //if we add it here, then add it upwards too
                 //check to see if it is alive
                 if (memberAlive(member)) {
-                    membership.memberAlive((MemberImpl)member);
-                    super.memberAdded(member);
+                    membership.memberAlive( (MemberImpl) member);
+                    notify = true;
                 } else {
                     addSuspects.put(member, new Long(System.currentTimeMillis()));
                 }
-            }        
+            }
         }
+        if ( notify ) super.memberAdded(member);
     }
 
-    public synchronized void memberDisappeared(Member member) {
+    public void memberDisappeared(Member member) {
         if ( membership == null ) setupMembership();
+        boolean notify = false;
         boolean shutdown = Arrays.equals(member.getPayload(),Member.SHUTDOWN_PAYLOAD);
         if ( !shutdown ) log.info("Received memberDisappeared["+member+"] message. Will verify.");
-        //check to see if the member really is gone
-        //if the payload is not a shutdown message
-        if ( shutdown || !memberAlive(member) ) {
-            //not correct, we need to maintain the map
-            membership.removeMember((MemberImpl)member);
-            super.memberDisappeared(member);
-        } else {
-            //add the member as suspect
-            removeSuspects.put(member, new Long(System.currentTimeMillis()));
+        synchronized (membership) {
+            //check to see if the member really is gone
+            //if the payload is not a shutdown message
+            if (shutdown || !memberAlive(member)) {
+                //not correct, we need to maintain the map
+                membership.removeMember( (MemberImpl) member);
+                notify = true;
+            } else {
+                //add the member as suspect
+                removeSuspects.put(member, new Long(System.currentTimeMillis()));
+            }
         }
+        if ( notify ) super.memberDisappeared(member);
     }
     
     public boolean hasMembers() {
@@ -165,47 +171,48 @@
         return super.getLocalMember(incAlive);
     }
     
-    public synchronized void heartbeat() {
+    public void heartbeat() {
         try {
             if (membership == null) setupMembership();
-            //update all alive times
-            Member[] members = super.getMembers();
-            for (int i = 0; members != null && i < members.length; i++) {
-                if (membership.memberAlive( (MemberImpl) members[i])) {
-                    //we don't have this one in our membership, check to see if he/she is alive
-                    if (memberAlive(members[i])) {
-                        log.warn("Member added, even though we werent notified:" + members[i]);
-                        super.memberAdded(members[i]);
-                    } else {
-                        membership.removeMember( (MemberImpl) members[i]);
+            synchronized (membership) {
+                //update all alive times
+                Member[] members = super.getMembers();
+                for (int i = 0; members != null && i < members.length; i++) {
+                    if (membership.memberAlive( (MemberImpl) members[i])) {
+                        //we don't have this one in our membership, check to see if he/she is alive
+                        if (memberAlive(members[i])) {
+                            log.warn("Member added, even though we werent notified:" + members[i]);
+                            super.memberAdded(members[i]);
+                        } else {
+                            membership.removeMember( (MemberImpl) members[i]);
+                        } //end if
                     } //end if
-                } //end if
-            } //for
+                } //for
 
-            //check suspect members if they are still alive,
-            //if not, simply issue the memberDisappeared message
-            MemberImpl[] keys = (MemberImpl[]) removeSuspects.keySet().toArray(new MemberImpl[removeSuspects.size()]);
-            for (int i = 0; i < keys.length; i++) {
-                MemberImpl m = (MemberImpl) keys[i];
-                if (membership.getMember(m) != null && (!memberAlive(m))) {
-                    membership.removeMember(m);
-                    super.memberDisappeared(m);
-                    removeSuspects.remove(m);
-                } //end if
-            }
+                //check suspect members if they are still alive,
+                //if not, simply issue the memberDisappeared message
+                MemberImpl[] keys = (MemberImpl[]) removeSuspects.keySet().toArray(new MemberImpl[removeSuspects.size()]);
+                for (int i = 0; i < keys.length; i++) {
+                    MemberImpl m = (MemberImpl) keys[i];
+                    if (membership.getMember(m) != null && (!memberAlive(m))) {
+                        membership.removeMember(m);
+                        super.memberDisappeared(m);
+                        removeSuspects.remove(m);
+                    } //end if
+                }
 
-            //check add suspects members if they are alive now,
-            //if they are, simply issue the memberAdded message
-            keys = (MemberImpl[]) addSuspects.keySet().toArray(new MemberImpl[addSuspects.size()]);
-            for (int i = 0; i < keys.length; i++) {
-                MemberImpl m = (MemberImpl) keys[i];
-                if ( membership.getMember(m) == null && (memberAlive(m))) {
-                    membership.memberAlive(m);
-                    super.memberAdded(m);
-                    addSuspects.remove(m);
-                } //end if
+                //check add suspects members if they are alive now,
+                //if they are, simply issue the memberAdded message
+                keys = (MemberImpl[]) addSuspects.keySet().toArray(new MemberImpl[addSuspects.size()]);
+                for (int i = 0; i < keys.length; i++) {
+                    MemberImpl m = (MemberImpl) keys[i];
+                    if ( membership.getMember(m) == null && (memberAlive(m))) {
+                        membership.memberAlive(m);
+                        super.memberAdded(m);
+                        addSuspects.remove(m);
+                    } //end if
+                }
             }
-
         }catch ( Exception x ) {
             log.warn("Unable to perform heartbeat on the TcpFailureDetector.",x);
         } finally {

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReceiverBase.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReceiverBase.java?rev=414323&r1=414322&r2=414323&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReceiverBase.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReceiverBase.java Wed Jun 14 10:47:00 2006
@@ -177,10 +177,11 @@
             }catch ( IOException x) {
                 retries--;
                 if ( retries <= 0 ) {
-                    log.info("Unable to bind server socket to "+addr+" throwing error.");
+                    log.info("Unable to bind server socket to:"+addr+" throwing error.");
                     throw x;
                 }
                 portstart++;
+                try {Thread.sleep(25);}catch( InterruptedException ti){Thread.currentThread().interrupted();}
                 retries = bind(socket,portstart,retries);
             }
         }

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReceiver.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReceiver.java?rev=414323&r1=414322&r2=414323&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReceiver.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReceiver.java Wed Jun 14 10:47:00 2006
@@ -55,9 +55,10 @@
     public void start() throws IOException {
         try {
             setPool(new ThreadPool(new Object(),getMaxThreads(),getMinThreads(),this));
-        } catch (Exception e) {
-            log.error("ThreadPool can initilzed. Listener not started", e);
-            return;
+        } catch (Exception x) {
+            log.fatal("ThreadPool can initilzed. Listener not started", x);
+            if ( x instanceof IOException ) throw (IOException)x;
+            else throw new IOException(x.getMessage());
         }
         try {
             getBind();
@@ -67,6 +68,8 @@
             t.start();
         } catch (Exception x) {
             log.fatal("Unable to start cluster receiver", x);
+            if ( x instanceof IOException ) throw (IOException)x;
+            else throw new IOException(x.getMessage());
         }
     }
     

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java?rev=414323&r1=414322&r2=414323&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java Wed Jun 14 10:47:00 2006
@@ -83,12 +83,13 @@
      * @throws Exception
      * @see org.apache.catalina.tribes.ClusterReceiver#start()
      */
-    public void start() {
+    public void start() throws IOException {
         try {
             setPool(new ThreadPool(interestOpsMutex, getMaxThreads(),getMinThreads(),this));
-        } catch (Exception e) {
-            log.error("ThreadPool can initilzed. Listener not started", e);
-            return;
+        } catch (Exception x) {
+            log.fatal("ThreadPool can initilzed. Listener not started", x);
+            if ( x instanceof IOException ) throw (IOException)x;
+            else throw new IOException(x.getMessage());
         }
         try {
             getBind();
@@ -98,6 +99,8 @@
             t.start();
         } catch (Exception x) {
             log.fatal("Unable to start cluster receiver", x);
+            if ( x instanceof IOException ) throw (IOException)x;
+            else throw new IOException(x.getMessage());
         }
     }
     

Modified: tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/CoordinationDemo.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/CoordinationDemo.java?rev=414323&r1=414322&r2=414323&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/CoordinationDemo.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/CoordinationDemo.java Wed Jun 14 10:47:00 2006
@@ -21,7 +21,9 @@
     static int CHANNEL_COUNT = 5;
     static int SCREEN_WIDTH = 120;
     static long SLEEP_TIME = 10;
+    static int CLEAR_SCREEN = 30;
     static boolean MULTI_THREAD = false;
+    static boolean[] VIEW_EVENTS = new boolean[255];
     StringBuffer statusLine = new StringBuffer();
     Status[] status = null;
     BufferedReader reader = null;
@@ -36,7 +38,7 @@
     
     public void clearScreen() {
         StringBuffer buf = new StringBuffer(700);
-        for (int i=0; i<30; i++ ) buf.append("\n");
+        for (int i=0; i<CLEAR_SCREEN; i++ ) buf.append("\n");
         System.out.println(buf);
     }
 
@@ -165,20 +167,39 @@
     
     
     
-
-
-
+    public static void setEvents(String events) {
+        java.util.Arrays.fill(VIEW_EVENTS,false);
+        StringTokenizer t = new StringTokenizer(events,",");
+        while (t.hasMoreTokens() ) {
+            int idx = Integer.parseInt(t.nextToken());
+            VIEW_EVENTS[idx] = true;
+        }
+    }
+    
     public static void main(String[] args) throws Exception {
         System.out.println("Usage:");
-        System.out.println("\tjava org.apache.catalina.tribes.demos.CoordinationDemo [channel-count multi-thread]");
+        System.out.println("\tjava org.apache.catalina.tribes.demos.CoordinationDemo -c channel-count(int) -t multi-thread(true|false) -s sleep-time(ms) -sc clear-screen(int) -p view_events_csv(1,2,5,7)");
         System.out.println("Example:");
         System.out.println("\tjava o.a.c.t.d.CoordinationDemo -> starts demo single threaded start/stop with 5 channels");
-        System.out.println("\tjava o.a.c.t.d.CoordinationDemo 10 -> starts demo single threaded start/stop with 10 channels");
-        System.out.println("\tjava o.a.c.t.d.CoordinationDemo 7 true-> starts demo multi threaded start/stop with 7 channels");
+        System.out.println("\tjava o.a.c.t.d.CoordinationDemo -c 10 -> starts demo single threaded start/stop with 10 channels");
+        System.out.println("\tjava o.a.c.t.d.CoordinationDemo -c 7 -t true -s 1000 -sc 50-> starts demo multi threaded start/stop with 7 channels and 1 second sleep time between events and 50 lines to clear screen");
+        System.out.println("\tjava o.a.c.t.d.CoordinationDemo -t true -p 12 -> starts demo multi threaded start/stop with 5 channels and only prints the EVT_CONF_RX event");
         System.out.println();
-
-        if ( args.length >= 1 ) CHANNEL_COUNT = Integer.parseInt(args[0]);
-        if ( args.length >= 2 ) MULTI_THREAD = true;
+        java.util.Arrays.fill(VIEW_EVENTS,true);
+        
+        for (int i=0; i<args.length; i++ ) {
+            if ( "-c".equals(args[i]) )
+                CHANNEL_COUNT = Integer.parseInt(args[++i]);
+            else if ( "-t".equals(args[i]) )
+                MULTI_THREAD = Boolean.parseBoolean(args[++i]);
+            else if ( "-s".equals(args[i]) )
+                SLEEP_TIME = Long.parseLong(args[++i]);
+            else if ( "-sc".equals(args[i]) )
+                CLEAR_SCREEN = Integer.parseInt(args[++i]);
+            else if ( "-p".equals(args[i]) )
+                setEvents(args[++i]);
+            else if ( "-h".equals(args[i]) ) System.exit(0);
+        }
         CoordinationDemo demo = new CoordinationDemo();
         demo.waitForInput();
     }
@@ -265,6 +286,9 @@
                 status = "Start failed:"+x.getMessage();
                 error = x;
                 startstatus = "failed";
+                try { channel.stop(GroupChannel.DEFAULT);}catch(Exception ignore){}
+                channel = null;
+                interceptor = null;
             }
         }
         
@@ -298,11 +322,10 @@
             interceptor = new NonBlockingCoordinator() {
                 public void fireInterceptorEvent(InterceptorEvent event) {
                     status = event.getEventTypeDesc();
-//                    if ( event instanceof NonBlockingCoordinator.CoordinationEvent &&
-//                         ((NonBlockingCoordinator.CoordinationEvent)event).getEventType() == NonBlockingCoordinator.CoordinationEvent.EVT_CONF_RX)
-                        parent.printScreen();
+                    int type = event.getEventType();
+                    boolean display = VIEW_EVENTS[type];
+                    if ( display ) parent.printScreen();
                     try { Thread.sleep(SLEEP_TIME); }catch ( Exception x){}
-                    
                 }
             };
             channel.addInterceptor(interceptor);



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org