You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/05/02 17:31:53 UTC

svn commit: r398964 - in /tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes: ./ membership/

Author: fhanik
Date: Tue May  2 08:31:11 2006
New Revision: 398964

URL: http://svn.apache.org/viewcvs?rev=398964&view=rev
Log:
Added in a stop message for a soft shutdown
Added in the ability to send a payload with each broadcast package

Modified:
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Member.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/MembershipService.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastMembership.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastService.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Member.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Member.java?rev=398964&r1=398963&r2=398964&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Member.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Member.java Tue May  2 08:31:11 2006
@@ -73,4 +73,10 @@
      */
     public byte[] getUniqueId();
     
+    /**
+     * returns the payload associated with this member
+     * @return byte[]
+     */
+    public byte[] getPayload();
+    
 }

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/MembershipService.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/MembershipService.java?rev=398964&r1=398963&r2=398964&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/MembershipService.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/MembershipService.java Tue May  2 08:31:11 2006
@@ -108,5 +108,12 @@
      * removes the membership listener.
      */
     public void removeMembershipListener();
+    
+    /**
+     * Set a payload to be broadcasted with each membership 
+     * broadcast.
+     * @param payload byte[]
+     */
+    public void setPayload(byte[] payload);
 
 }

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastMembership.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastMembership.java?rev=398964&r1=398963&r2=398964&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastMembership.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastMembership.java Tue May  2 08:31:11 2006
@@ -134,7 +134,7 @@
         synchronized (members) {
             int n = -1;
             for (int i = 0; i < members.length; i++) {
-                if (members[i] == member) {
+                if (members[i] == member || members[i].equals(member)) {
                     n = i;
                     break;
                 }

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastService.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastService.java?rev=398964&r1=398963&r2=398964&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastService.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastService.java Tue May  2 08:31:11 2006
@@ -68,6 +68,8 @@
     protected MemberImpl localMember ;
     private int mcastSoTimeout;
     private int mcastTTL;
+    
+    protected byte[] payload;
 
     /**
      * Create a membership service.
@@ -232,6 +234,7 @@
             localMember.setPort(port);
             localMember.setMemberAliveTime(100);
         }
+        if ( this.payload != null ) localMember.setPayload(payload);
         localMember.setUniqueId(UUIDGenerator.randomUUID(true));
         localMember.setServiceStartTime(System.currentTimeMillis());
         java.net.InetAddress bind = null;
@@ -372,9 +375,18 @@
     public int getMcastTTL() {
         return mcastTTL;
     }
+
+    public byte[] getPayload() {
+        return payload;
+    }
+
     public void setMcastTTL(int mcastTTL) {
         this.mcastTTL = mcastTTL;
         properties.setProperty("mcastTTL", String.valueOf(mcastTTL));
+    }
+
+    public void setPayload(byte[] payload) {
+        this.payload = payload;
     }
 
     /**

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java?rev=398964&r1=398963&r2=398964&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java Tue May  2 08:31:11 2006
@@ -23,6 +23,8 @@
 import java.net.MulticastSocket;
 
 import org.apache.catalina.tribes.MembershipListener;
+import java.util.Arrays;
+import java.net.SocketTimeoutException;
 
 /**
  * A <b>membership</b> implementation using simple multicast.
@@ -97,9 +99,20 @@
      */
     protected long serviceStartTime = System.currentTimeMillis();
     
+    /**
+     * Time to live for the multicast packets that are being sent out
+     */
     protected int mcastTTL = -1;
+    /**
+     * Read timeout on the mcast socket
+     */
     protected int mcastSoTimeout = -1;
+    /**
+     * bind address
+     */
     protected InetAddress mcastBindAddress = null;
+    
+    protected static final byte[] STOP_PAYLOAD = new byte[] {66, 65, 66, 89, 45, 65, 76, 69, 88};
 
     /**
      * Create a new mcast service impl
@@ -130,10 +143,10 @@
         this.mcastTTL = ttl;
         this.mcastBindAddress = bind;
         setupSocket();
-        sendPacket = new DatagramPacket(new byte[1000],1000);
+        sendPacket = new DatagramPacket(new byte[1024],1024);
         sendPacket.setAddress(address);
         sendPacket.setPort(port);
-        receivePacket = new DatagramPacket(new byte[1000],1000);
+        receivePacket = new DatagramPacket(new byte[1024],1024);
         receivePacket.setAddress(address);
         receivePacket.setPort(port);
         membership = new McastMembership(member);
@@ -193,10 +206,18 @@
      * @throws IOException if the service fails to disconnect from the sockets
      */
     public synchronized void stop() throws IOException {
-        socket.leaveGroup(address);
         doRun = false;
+        sender.interrupt();
+        receiver.interrupt();
         sender = null;
         receiver = null;
+        //send a stop message
+        byte[] payload = member.getPayload();
+        member.setPayload(STOP_PAYLOAD);
+        send();
+        //restore payload
+        member.setPayload(payload);
+        socket.leaveGroup(address);
         serviceStartTime = Long.MAX_VALUE;
     }
 
@@ -205,23 +226,42 @@
      * @throws IOException
      */
     public void receive() throws IOException {
-        socket.receive(receivePacket);
-        byte[] data = new byte[receivePacket.getLength()];
-        System.arraycopy(receivePacket.getData(),receivePacket.getOffset(),data,0,data.length);
-        MemberImpl m = MemberImpl.getMember(data);
-        if(log.isDebugEnabled())
-            log.debug("Mcast receive ping from member " + m);
+        checkExpired();
+        try {
+            socket.receive(receivePacket);
+            byte[] data = new byte[receivePacket.getLength()];
+            System.arraycopy(receivePacket.getData(), receivePacket.getOffset(), data, 0, data.length);
+            MemberImpl m = MemberImpl.getMember(data);
+            if (log.isDebugEnabled())
+                log.debug("Mcast receive ping from member " + m);
 
-        if ( membership.memberAlive(m) ) {
-            if(log.isDebugEnabled())
-                log.debug("Mcast add member " + m);
-            service.memberAdded(m);
+            if (Arrays.equals(m.getPayload(), STOP_PAYLOAD)) {
+                if (log.isDebugEnabled())
+                    log.debug("Mcast member shutdown" + m);
+                membership.removeMcastMember(m);
+                service.memberDisappeared(m);
+            } else if (membership.memberAlive(m)) {
+                if (log.isDebugEnabled())
+                    log.debug("Mcast add member " + m);
+                service.memberAdded(m);
+            } //end if
+        } catch (SocketTimeoutException x ) { 
+            //do nothing, this is normal, we don't want to block forever
+            //since the receive thread is the same thread
+            //that does membership expiration
         }
+    }
+
+    protected synchronized void checkExpired() {
         MemberImpl[] expired = membership.expire(timeToExpiration);
         for ( int i=0; i<expired.length; i++) {
             if(log.isDebugEnabled())
-                log.debug("Mcast exipre  member " + m);
-            service.memberDisappeared(expired[i]);
+                log.debug("Mcast exipre  member " + expired[i]);
+            try {
+                service.memberDisappeared(expired[i]);
+            }catch ( Exception x ) { 
+                log.error("Unable to process member disappeared message.",x);
+            }
         }
     }
 
@@ -229,7 +269,7 @@
      * Send a ping
      * @throws Exception
      */
-    public void send() throws Exception{
+    public void send() throws IOException{
         member.inc();
         if(log.isDebugEnabled())
             log.debug("Mcast send ping from member " + member);
@@ -238,6 +278,7 @@
         p.setAddress(address);
         p.setPort(port);
         socket.send(p);
+        checkExpired();
     }
 
     public long getServiceStartTime() {

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java?rev=398964&r1=398963&r2=398964&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java Tue May  2 08:31:11 2006
@@ -83,8 +83,22 @@
      */
     protected transient long serviceStartTime;
     
+    /**
+     * To avoid serialization over and over again, once the local dataPkg
+     * has been set, we use that to transmit data
+     */
     protected transient byte[] dataPkg = null;
-    private byte[] uniqueId = new byte[16];
+
+    /**
+     * Unique session Id for this member
+     */
+    protected byte[] uniqueId = new byte[16];
+    
+    /**
+     * Custom payload that an app framework can broadcast
+     * Also used to transport stop command.
+     */
+    protected byte[] payload = new byte[0];
 
     /**
      * Empty constructor for serialization
@@ -178,13 +192,15 @@
         //dlen - 4 bytes
         //domain - dlen bytes
         //uniqueId - 16 bytes
+        //payload length - 4 bytes
+        //payload plen bytes
         
         
         byte[] domaind = this.domain;
         byte[] addr = host;
         long alive=System.currentTimeMillis()-getServiceStartTime();
         byte hl = (byte)addr.length;
-        byte[] data = new byte[8+4+1+addr.length+4+domaind.length+16];
+        byte[] data = new byte[8+4+1+addr.length+4+domaind.length+16+4+payload.length];
         int pos = 0;
         //alive data
         XByteBuffer.toBytes((long)alive,data,0);
@@ -205,6 +221,14 @@
         pos+=domaind.length;
         //unique Id
         System.arraycopy(uniqueId,0,data,pos,uniqueId.length);
+        pos+=uniqueId.length;
+        //payload
+        XByteBuffer.toBytes(payload.length,data,pos);
+        pos+=4;
+        System.arraycopy(payload,0,data,pos,payload.length);
+        pos+=payload.length;
+
+        //create local data
         dataPkg = data;
         return data;
     }
@@ -247,12 +271,21 @@
        
        byte[] uniqueId = new byte[16];
        System.arraycopy(data, pos, uniqueId, 0, 16);
+       pos+=16;
+       
+       int pl = XByteBuffer.toInt(data,pos);
+       pos+=4;
+       
+       byte[] payload = new byte[pl];
+       System.arraycopy(data, pos, payload, 0, payload.length);
+       pos+=payload.length;
        
        member.domain = domaind;
        member.setHost(addr);
        member.setPort(XByteBuffer.toInt(portd, 0));
        member.setMemberAliveTime(XByteBuffer.toLong(alived, 0));
        member.setUniqueId(uniqueId);
+       member.payload = payload;
        
        member.dataPkg = new byte[data.length];
        System.arraycopy(data,0,member.dataPkg,0,data.length);
@@ -327,6 +360,10 @@
         return uniqueId;
     }
 
+    public byte[] getPayload() {
+        return payload;
+    }
+
     public void setMemberAliveTime(long time) {
        memberAliveTime=time;
     }
@@ -343,7 +380,8 @@
         buf.append(getHostname()).append(",");
         buf.append(port).append(", alive=");
         buf.append(memberAliveTime).append(",");
-        buf.append("id=").append(bToS(this.uniqueId));
+        buf.append("id=").append(bToS(this.uniqueId)).append(", ");
+        buf.append("payload=").append(bToS(this.payload)).append(", ");
         buf.append("]");
         return buf.toString();
     }
@@ -466,6 +504,10 @@
 
     public void setUniqueId(byte[] uniqueId) {
         this.uniqueId = uniqueId;
+    }
+
+    public void setPayload(byte[] payload) {
+        this.payload = payload;
     }
 
     public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org