You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/05/03 20:48:04 UTC

svn commit: r399382 - in /tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes: Channel.java MembershipService.java group/ChannelCoordinator.java membership/McastService.java membership/McastServiceImpl.java

Author: fhanik
Date: Wed May  3 11:48:01 2006
New Revision: 399382

URL: http://svn.apache.org/viewcvs?rev=399382&view=rev
Log:
Start levels are now respected correctly, this will make unit testing easier since we can shutdown and start different components to simulate errors

Modified:
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/MembershipService.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastService.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java?rev=399382&r1=399381&r2=399382&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java Wed May  3 11:48:01 2006
@@ -30,9 +30,9 @@
      * Start and stop sequences can be controlled by these constants
      */
     public static final int DEFAULT = 15;
-    public static final int MBR_RX_SEQ = 1;
+    public static final int SND_RX_SEQ = 1;
     public static final int SND_TX_SEQ = 2;
-    public static final int SND_RX_SEQ = 4;
+    public static final int MBR_RX_SEQ = 4;
     public static final int MBR_TX_SEQ = 8;
     
     /**

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/MembershipService.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/MembershipService.java?rev=399382&r1=399381&r2=399382&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/MembershipService.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/MembershipService.java Wed May  3 11:48:01 2006
@@ -27,8 +27,8 @@
 
 public interface MembershipService {
     
-    public static final int MBR_RX = 1;
-    public static final int MBR_TX = 2;
+    public static final int MBR_RX = Channel.MBR_RX_SEQ;
+    public static final int MBR_TX = Channel.MBR_TX_SEQ;
     
     /**
      * Sets the properties for the membership service. This must be called before
@@ -52,17 +52,24 @@
     /**
      * Starts the membership service. If a membership listeners is added
      * the listener will start to receive membership events.
-     * @param level - level 1 starts listening for members, level 2 
+     * @param level - level MBR_RX starts listening for members, level MBR_TX 
      * starts broad casting the server
      * @throws java.lang.Exception if the service fails to start.
+     * @throws java.lang.IllegalArgumentException if the level is incorrect.
      */
     public void start(int level) throws java.lang.Exception;
 
 
     /**
-     * Stops the membership service
+     * Starts the membership service. If a membership listeners is added
+     * the listener will start to receive membership events.
+     * @param level - level MBR_RX stops listening for members, level MBR_TX 
+     * stops broad casting the server
+     * @throws java.lang.Exception if the service fails to stop
+     * @throws java.lang.IllegalArgumentException if the level is incorrect.
      */
-    public void stop();
+
+    public void stop(int level);
     
     /**
      * Returns that cluster has members.

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java?rev=399382&r1=399381&r2=399382&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java Wed May  3 11:48:01 2006
@@ -41,10 +41,13 @@
     private ChannelReceiver clusterReceiver = new NioReceiver();
     private ChannelSender clusterSender = new ReplicationTransmitter();
     private MembershipService membershipService = new McastService();
-    private boolean started = false;
     
+    //override optionflag
     protected int optionFlag = Channel.SEND_OPTIONS_BYTE_MESSAGE|Channel.SEND_OPTIONS_USE_ACK|Channel.SEND_OPTIONS_SYNCHRONIZED_ACK;
+    public int getOptionFlag() {return optionFlag;}
+    public void setOptionFlag(int flag) {optionFlag=flag;}
     
+    private int startLevel = 0;
 
     public ChannelCoordinator() {
         
@@ -70,7 +73,7 @@
         if ( destination == null ) destination = membershipService.getMembers();
         clusterSender.sendMessage(msg,destination);
     }
-
+    
 
     /**
      * Starts up the channel. This can be called multiple times for individual services to start
@@ -84,7 +87,7 @@
      * @throws ChannelException if a startup error occurs or the service is already started.
      */
     public void start(int svc) throws ChannelException {
-        this.start();
+        this.internalStart(svc);
     }
 
     /**
@@ -99,7 +102,7 @@
      * @throws ChannelException if a startup error occurs or the service is already started.
      */
     public void stop(int svc) throws ChannelException {
-        this.stop();
+        this.internalStop(svc);
     }    
 
 
@@ -114,20 +117,38 @@
      * SND_RX_SEQ - starts the replication receiver<BR>
      * @throws ChannelException if a startup error occurs or the service is already started.
      */
-    protected synchronized void start() throws ChannelException {
+    protected synchronized void internalStart(int svc) throws ChannelException {
         try {
-            if (started) return;
+            boolean valid = false;
+            if (startLevel == Channel.DEFAULT) return;
+
             //must start the receiver first so that we can coordinate the port it
             //listens to with the local membership settings
-            clusterReceiver.setMessageListener(this);
-            clusterReceiver.start();
-            clusterSender.start();
-            //synchronize, big time FIXME
-            membershipService.setMembershipListener(this);
-            membershipService.setLocalMemberProperties(getClusterReceiver().getHost(), getClusterReceiver().getPort());
-            membershipService.start(MembershipService.MBR_RX);
-            membershipService.start(MembershipService.MBR_TX);
-            this.started = true;
+            if ( Channel.SND_RX_SEQ==(svc & Channel.SND_RX_SEQ) ) {
+                clusterReceiver.setMessageListener(this);
+                clusterReceiver.start();
+                //synchronize, big time FIXME
+                membershipService.setLocalMemberProperties(getClusterReceiver().getHost(), getClusterReceiver().getPort());
+                valid = true;
+            }
+            if ( Channel.SND_TX_SEQ==(svc & Channel.SND_TX_SEQ) ) {
+                clusterSender.start();
+                valid = true;
+            }
+            
+            if ( Channel.MBR_RX_SEQ==(svc & Channel.MBR_RX_SEQ) ) {
+                membershipService.setMembershipListener(this);
+                membershipService.start(MembershipService.MBR_RX);
+                valid = true;
+            }
+            if ( Channel.MBR_TX_SEQ==(svc & Channel.MBR_TX_SEQ) ) {
+                membershipService.start(MembershipService.MBR_TX);
+                valid = true;
+            }            
+            if ( !valid) {
+                throw new IllegalArgumentException("Invalid start level, valid levels are:SND_RX_SEQ,SND_TX_SEQ,MBR_TX_SEQ,MBR_RX_SEQ");
+            }
+            startLevel = (startLevel | svc);
         }catch ( ChannelException cx ) {
             throw cx;
         }catch ( Exception x ) {
@@ -146,17 +167,40 @@
      * SND_RX_SEQ - starts the replication receiver<BR>
      * @throws ChannelException if a startup error occurs or the service is already started.
      */
-    protected synchronized void stop() throws ChannelException {
+    protected synchronized void internalStop(int svc) throws ChannelException {
         try {
-            if ( !started ) return;
-            membershipService.stop();
-            clusterReceiver.stop();
-            clusterSender.stop();
-            membershipService.stop();
+            boolean valid = false;
+            if ( startLevel == 0 ) return;
+            if ( Channel.SND_RX_SEQ==(svc & Channel.SND_RX_SEQ) ) {
+                clusterReceiver.stop();
+                clusterReceiver.setMessageListener(null);
+                valid = true;
+            }
+            if ( Channel.SND_TX_SEQ==(svc & Channel.SND_TX_SEQ) ) {
+                clusterSender.stop();
+                valid = true;
+            }
+
+            if ( Channel.MBR_RX_SEQ==(svc & Channel.MBR_RX_SEQ) ) {
+                membershipService.stop(MembershipService.MBR_RX);
+                membershipService.setMembershipListener(null);
+                valid = true;
+                
+            }
+            if ( Channel.MBR_TX_SEQ==(svc & Channel.MBR_TX_SEQ) ) {
+                valid = true;
+                membershipService.stop(MembershipService.MBR_TX);
+            }            
+            if ( !valid) {
+                throw new IllegalArgumentException("Invalid start level, valid levels are:SND_RX_SEQ,SND_TX_SEQ,MBR_TX_SEQ,MBR_RX_SEQ");
+            }
+
+            startLevel = (startLevel & (~svc));
+            
         }catch ( Exception x ) {
             throw new ChannelException(x);
         } finally {
-            started = false;
+            
         }
 
     }

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastService.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastService.java?rev=399382&r1=399381&r2=399382&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastService.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastService.java Wed May  3 11:48:01 2006
@@ -205,7 +205,7 @@
      * @param name The property to check for
      */
     protected void hasProperty(Properties properties, String name){
-        if ( properties.getProperty(name)==null) throw new IllegalArgumentException("Required property \""+name+"\" is missing.");
+        if ( properties.getProperty(name)==null) throw new IllegalArgumentException("McastService:Required property \""+name+"\" is missing.");
     }
 
     /**
@@ -218,6 +218,14 @@
     }
     
     public void start(int level) throws java.lang.Exception {
+        hasProperty(properties,"mcastPort");
+        hasProperty(properties,"mcastAddress");
+        hasProperty(properties,"mcastClusterDomain");
+        hasProperty(properties,"memberDropTime");
+        hasProperty(properties,"mcastFrequency");
+        hasProperty(properties,"tcpListenPort");
+        hasProperty(properties,"tcpListenHost");
+
         if ( impl != null ) {
             impl.start(level);
             return;
@@ -279,13 +287,12 @@
     /**
      * Stop broadcasting and listening to membership pings
      */
-    public void stop() {
+    public void stop(int svc) {
         try  {
-            if ( impl != null) impl.stop();
+            if ( impl != null && impl.stop(svc) ) impl = null;
         } catch ( Exception x)  {
-            log.error("Unable to stop the mcast service.",x);
+            log.error("Unable to stop the mcast service, level:"+svc+".",x);
         }
-        impl = null;
     }
 
 

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java?rev=399382&r1=399381&r2=399382&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java Wed May  3 11:48:01 2006
@@ -26,6 +26,7 @@
 import java.util.Arrays;
 import java.net.SocketTimeoutException;
 import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.Channel;
 
 /**
  * A <b>membership</b> implementation using simple multicast.
@@ -45,7 +46,9 @@
     /**
      * Internal flag used for the listen thread that listens to the multicasting socket.
      */
-    protected boolean doRun = false;
+    protected boolean doRunSender = false;
+    protected boolean doRunReceiver = false;
+    protected int startLevel = 0;
     /**
      * Socket that we intend to listen to
      */
@@ -183,44 +186,70 @@
      * @throws IllegalStateException if the service is already started
      */
     public synchronized void start(int level) throws IOException {
-        if ( sender != null && receiver != null ) throw new IllegalStateException("Service already running.");
-        if ( level == 1 ) {
+        boolean valid = false;
+        if ( (level & Channel.MBR_RX_SEQ)==Channel.MBR_RX_SEQ ) {
+            if ( receiver != null ) throw new IllegalStateException("McastService.receive already running.");
             socket.joinGroup(address);
-            doRun = true;
+            doRunReceiver = true;
             receiver = new ReceiverThread();
             receiver.setDaemon(true);
             receiver.start();
-        }
-        if ( level==2 ) {
+            valid = true;
+        } 
+        if ( (level & Channel.MBR_TX_SEQ)==Channel.MBR_TX_SEQ ) {
+            if ( sender != null ) throw new IllegalStateException("McastService.send already running.");
+            doRunSender = true;
             serviceStartTime = System.currentTimeMillis();
             sender = new SenderThread(sendFrequency);
             sender.setDaemon(true);
             sender.start();
-            
+            valid = true;
+        } 
+        if (!valid) {
+            throw new IllegalArgumentException("Invalid start level. Only acceptable levels are Channel.MBR_RX_SEQ and Channel.MBR_TX_SEQ");
         }
+        startLevel = (startLevel | level);
     }
 
     /**
      * Stops the service
      * @throws IOException if the service fails to disconnect from the sockets
      */
-    public synchronized void stop() throws IOException {
-        doRun = false;
-        sender.interrupt();
-        receiver.interrupt();
-        sender = null;
-        receiver = null;
-        //send a stop message
-        byte[] payload = member.getPayload();
-        member.setPayload(Member.SHUTDOWN_PAYLOAD);
-        member.getData(true,true);
-        send();
-        //restore payload
-        member.setPayload(payload);
-        member.getData(true,true);
-        //leave mcast group
-        socket.leaveGroup(address);
-        serviceStartTime = Long.MAX_VALUE;
+    public synchronized boolean stop(int level) throws IOException {
+        boolean valid = false;
+        
+        if ( (level & Channel.MBR_RX_SEQ)==Channel.MBR_RX_SEQ ) {
+            valid = true;
+            doRunReceiver = false;
+            receiver.interrupt();
+            receiver = null;
+        } 
+        if ( (level & Channel.MBR_TX_SEQ)==Channel.MBR_TX_SEQ ) {
+            valid = true;
+            doRunSender = false;
+            sender.interrupt();
+            sender = null;
+        } 
+        
+        if (!valid) {
+            throw new IllegalArgumentException("Invalid stop level. Only acceptable levels are Channel.MBR_RX_SEQ and Channel.MBR_TX_SEQ");
+        }
+        startLevel = (startLevel & (~level));
+        //we're shutting down, send a shutdown message and close the socket
+        if ( startLevel == 0 ) {
+            //send a stop message
+            byte[] payload = member.getPayload();
+            member.setPayload(Member.SHUTDOWN_PAYLOAD);
+            member.getData(true, true);
+            send();
+            //restore payload
+            member.setPayload(payload);
+            member.getData(true, true);
+            //leave mcast group
+            socket.leaveGroup(address);
+            serviceStartTime = Long.MAX_VALUE;
+        }
+        return (startLevel == 0);
     }
 
     /**
@@ -293,7 +322,7 @@
             setName("Cluster-MembershipReceiver");
         }
         public void run() {
-            while ( doRun ) {
+            while ( doRunReceiver ) {
                 try {
                     receive();
                 } catch ( Exception x ) {
@@ -313,7 +342,7 @@
 
         }
         public void run() {
-            while ( doRun ) {
+            while ( doRunSender ) {
                 try {
                     send();
                 } catch ( Exception x ) {



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org