You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/05/17 21:56:22 UTC

svn commit: r407359 - in /tomcat/container/tc5.5.x/modules/groupcom: src/share/org/apache/catalina/tribes/group/RpcChannel.java src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java to-do.txt

Author: fhanik
Date: Wed May 17 12:56:22 2006
New Revision: 407359

URL: http://svn.apache.org/viewvc?rev=407359&view=rev
Log:
Implemented soft membership ping

Modified:
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcChannel.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
    tomcat/container/tc5.5.x/modules/groupcom/to-do.txt

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcChannel.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcChannel.java?rev=407359&r1=407358&r2=407359&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcChannel.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcChannel.java Wed May 17 12:56:22 2006
@@ -90,7 +90,7 @@
             }
         } catch ( InterruptedException ix ) {
             Thread.currentThread().interrupted();
-            throw new ChannelException(ix);
+            //throw new ChannelException(ix);
         }finally {
             responseMap.remove(key);
         }

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java?rev=407359&r1=407358&r2=407359&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java Wed May 17 12:56:22 2006
@@ -16,7 +16,6 @@
 
 package org.apache.catalina.tribes.tipis;
 
-import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
@@ -33,10 +32,15 @@
 import org.apache.catalina.tribes.ChannelListener;
 import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.MembershipListener;
+import org.apache.catalina.tribes.group.Response;
+import org.apache.catalina.tribes.group.RpcCallback;
+import org.apache.catalina.tribes.group.RpcChannel;
 import org.apache.catalina.tribes.io.XByteBuffer;
 import org.apache.catalina.tribes.membership.MemberImpl;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.catalina.tribes.Heartbeat;
+import java.util.HashMap;
 import org.apache.catalina.tribes.group.*;
 
 /**
@@ -51,7 +55,7 @@
  * @author not attributable
  * @version 1.0
  */
-public abstract class AbstractReplicatedMap extends LinkedHashMap implements RpcCallback, ChannelListener, MembershipListener {
+public abstract class AbstractReplicatedMap extends LinkedHashMap implements RpcCallback, ChannelListener, MembershipListener, Heartbeat {
     protected static Log log = LogFactory.getLog(AbstractReplicatedMap.class);
 
     /**
@@ -78,10 +82,12 @@
     private transient byte[] mapContextName;
     private transient boolean stateTransferred = false;
     private transient Object stateMutex = new Object();
-    private transient ArrayList mapMembers = new ArrayList();
+    private transient HashMap mapMembers = new HashMap();
     private transient int channelSendOptions = Channel.SEND_OPTIONS_DEFAULT;
     private transient Object mapOwner;
     private transient ClassLoader[] externalLoaders;
+    protected transient int currentNode = 0;
+    private transient long accessTimeout = 5000;
     
 
 //------------------------------------------------------------------------------
@@ -149,6 +155,34 @@
     }
     
     
+    private void ping(long timeout) throws ChannelException {
+        //send out a map membership message, only wait for the first reply
+        MapMessage msg = new MapMessage(this.mapContextName, MapMessage.MSG_INIT,
+                                        false, null, null, null, wrap(channel.getLocalMember(false)));
+        Response[] resp = rpcChannel.send(channel.getMembers(), msg, rpcChannel.ALL_REPLY, (channelSendOptions), (int)accessTimeout);
+        for (int i = 0; i < resp.length; i++) {
+            memberAlive(resp[i].getSource());
+        }//for
+
+        synchronized (mapMembers) {
+            Iterator it = mapMembers.entrySet().iterator();
+            long now = System.currentTimeMillis();
+            while ( it.hasNext() ) {
+                Map.Entry entry = (Map.Entry)it.next();
+                long access = ((Long)entry.getValue()).longValue(); 
+                if ( (now - access) > timeout ) memberDisappeared((Member)entry.getKey());
+            }
+        }//synch
+    }
+
+    private void memberAlive(Member member) {
+        synchronized (mapMembers) {
+            if (!mapMembers.containsKey(member)) {
+                mapMemberAdded(member);
+            } //end if
+            mapMembers.put(member, new Long(System.currentTimeMillis()));
+        }
+    }
 
     private void broadcast(int msgtype, boolean rpc) throws ChannelException {
         //send out a map membership message, only wait for the first reply
@@ -190,21 +224,22 @@
 //------------------------------------------------------------------------------
 //              GROUP COM INTERFACES
 //------------------------------------------------------------------------------
-    public Member[] getMapMembers() {
-        synchronized (mapMembers) {
-            Member[] result = new Member[mapMembers.size()];
-            mapMembers.toArray(result);
+    public Member[] getMapMembers(HashMap members) {
+        synchronized (members) {
+            Member[] result = new Member[members.size()];
+            members.keySet().toArray(result);
             return result;
         }
     }
+    public Member[] getMapMembers() {
+        return getMapMembers(this.mapMembers);
+    }
     
     public Member[] getMapMembersExcl(Member[] exclude) {
         synchronized (mapMembers) {
-            ArrayList list = (ArrayList)mapMembers.clone();
+            HashMap list = (HashMap)mapMembers.clone();
             for (int i=0; i<exclude.length;i++) list.remove(exclude[i]);
-            Member[] result = new Member[list.size()];
-            list.toArray(result);
-            return result;
+            return getMapMembers(list);
         }
     }
 
@@ -375,6 +410,8 @@
             mapmsg.deserialize(getExternalLoaders());
             if (mapmsg.getMsgType() == MapMessage.MSG_START) {
                 mapMemberAdded(mapmsg.getBackupNodes()[0]);
+            } else if (mapmsg.getMsgType() == MapMessage.MSG_INIT) {
+                memberAlive(mapmsg.getBackupNodes()[0]);
             }
         } catch (IOException x ) {
             log.error("Unable to deserialize MapMessage.",x);
@@ -469,8 +506,8 @@
         boolean memberAdded = false;
         //select a backup node if we don't have one
         synchronized (mapMembers) {
-            if (!mapMembers.contains(member) ) {
-                mapMembers.add(member);
+            if (!mapMembers.containsKey(member) ) {
+                mapMembers.put(member, new Long(System.currentTimeMillis()));
                 memberAdded = true;
             }
         }
@@ -520,7 +557,7 @@
     public void memberDisappeared(Member member) {
         boolean removed = false;
         synchronized (mapMembers) {
-            removed = mapMembers.remove(member);
+            removed = (mapMembers.remove(member) != null );
         }
         Iterator i = super.entrySet().iterator();
         while (i.hasNext()) {
@@ -537,7 +574,6 @@
         } //while
     }
 
-    protected int currentNode = 0;
     public int getNextBackupIndex() {
         int size = mapMembers.size();
         if (mapMembers.size() == 0)return -1;
@@ -557,6 +593,14 @@
     }
 
     protected abstract Member[] publishEntryInfo(Object key, Object value) throws ChannelException;
+    
+    public void heartbeat() {
+        try {
+            ping(accessTimeout);
+        }catch ( Exception x ) {
+            log.error("Unable to send AbstractReplicatedMap.ping message",x);
+        }
+    }
 
 //------------------------------------------------------------------------------    
 //              METHODS TO OVERRIDE    
@@ -901,6 +945,10 @@
         return channelSendOptions;
     }
 
+    public long getAccessTimeout() {
+        return accessTimeout;
+    }
+
     public void setMapOwner(Object mapOwner) {
         this.mapOwner = mapOwner;
     }
@@ -911,6 +959,10 @@
 
     public void setChannelSendOptions(int channelSendOptions) {
         this.channelSendOptions = channelSendOptions;
+    }
+
+    public void setAccessTimeout(long accessTimeout) {
+        this.accessTimeout = accessTimeout;
     }
 
 }

Modified: tomcat/container/tc5.5.x/modules/groupcom/to-do.txt
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/to-do.txt?rev=407359&r1=407358&r2=407359&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/to-do.txt (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/to-do.txt Wed May 17 12:56:22 2006
@@ -41,21 +41,6 @@
 Code Tasks:
 ===========================================
 
-46. Heartbeat Interface, to notify listeners as well
-
-44. Soft membership failure detection, ie if a webapp is stopped, but
-    the AbstractReplicatedMap doesn't broadcast a stop message
-    This is one potential solution:
-    1. keep a static WeakHashMap of all map implementations running
-       so that we can share one heartbeat thread for timeouts
-    2. everytime a message is received, update the last check time for that
-       member so that we don't need the thread to actively check
-    3. when the thread wakes up, it will check maps that are outside
-       the valid range for check time, 
-    4. send a RPC message, if no reply, remove the map from itself
-    Other solution, use the TcpFailureDetector, catch send errors 
-
-
 45. McastServiceImpl.receive should have a SO_TIMEOUT so that we can check
     for members dropping on the same thread
 
@@ -286,3 +271,18 @@
     component, only the listener
 Notes: Completed. added in correct startup sequences.
 
+46. Heartbeat Interface, to notify listeners as well
+Notes: Implemented
+
+44. Soft membership failure detection, ie if a webapp is stopped, but
+    the AbstractReplicatedMap doesn't broadcast a stop message
+    This is one potential solution:
+    1. keep a static WeakHashMap of all map implementations running
+       so that we can share one heartbeat thread for timeouts
+    2. everytime a message is received, update the last check time for that
+       member so that we don't need the thread to actively check
+    3. when the thread wakes up, it will check maps that are outside
+       the valid range for check time, 
+    4. send a RPC message, if no reply, remove the map from itself
+    Other solution, use the TcpFailureDetector, catch send errors 
+Notes: Implemented using a periodic ping in the AbstractReplicatedMap
\ No newline at end of file



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org