You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/05/17 21:56:22 UTC
svn commit: r407359 - in /tomcat/container/tc5.5.x/modules/groupcom:
src/share/org/apache/catalina/tribes/group/RpcChannel.java
src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
to-do.txt
Author: fhanik
Date: Wed May 17 12:56:22 2006
New Revision: 407359
URL: http://svn.apache.org/viewvc?rev=407359&view=rev
Log:
Implemented soft membership ping
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcChannel.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
tomcat/container/tc5.5.x/modules/groupcom/to-do.txt
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcChannel.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcChannel.java?rev=407359&r1=407358&r2=407359&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcChannel.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcChannel.java Wed May 17 12:56:22 2006
@@ -90,7 +90,7 @@
}
} catch ( InterruptedException ix ) {
Thread.currentThread().interrupted();
- throw new ChannelException(ix);
+ //throw new ChannelException(ix);
}finally {
responseMap.remove(key);
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java?rev=407359&r1=407358&r2=407359&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java Wed May 17 12:56:22 2006
@@ -16,7 +16,6 @@
package org.apache.catalina.tribes.tipis;
-import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
@@ -33,10 +32,15 @@
import org.apache.catalina.tribes.ChannelListener;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.MembershipListener;
+import org.apache.catalina.tribes.group.Response;
+import org.apache.catalina.tribes.group.RpcCallback;
+import org.apache.catalina.tribes.group.RpcChannel;
import org.apache.catalina.tribes.io.XByteBuffer;
import org.apache.catalina.tribes.membership.MemberImpl;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.catalina.tribes.Heartbeat;
+import java.util.HashMap;
import org.apache.catalina.tribes.group.*;
/**
@@ -51,7 +55,7 @@
* @author not attributable
* @version 1.0
*/
-public abstract class AbstractReplicatedMap extends LinkedHashMap implements RpcCallback, ChannelListener, MembershipListener {
+public abstract class AbstractReplicatedMap extends LinkedHashMap implements RpcCallback, ChannelListener, MembershipListener, Heartbeat {
protected static Log log = LogFactory.getLog(AbstractReplicatedMap.class);
/**
@@ -78,10 +82,12 @@
private transient byte[] mapContextName;
private transient boolean stateTransferred = false;
private transient Object stateMutex = new Object();
- private transient ArrayList mapMembers = new ArrayList();
+ private transient HashMap mapMembers = new HashMap();
private transient int channelSendOptions = Channel.SEND_OPTIONS_DEFAULT;
private transient Object mapOwner;
private transient ClassLoader[] externalLoaders;
+ protected transient int currentNode = 0;
+ private transient long accessTimeout = 5000;
//------------------------------------------------------------------------------
@@ -149,6 +155,34 @@
}
+ private void ping(long timeout) throws ChannelException {
+ //send out a map membership message, only wait for the first reply
+ MapMessage msg = new MapMessage(this.mapContextName, MapMessage.MSG_INIT,
+ false, null, null, null, wrap(channel.getLocalMember(false)));
+ Response[] resp = rpcChannel.send(channel.getMembers(), msg, rpcChannel.ALL_REPLY, (channelSendOptions), (int)accessTimeout);
+ for (int i = 0; i < resp.length; i++) {
+ memberAlive(resp[i].getSource());
+ }//for
+
+ synchronized (mapMembers) {
+ Iterator it = mapMembers.entrySet().iterator();
+ long now = System.currentTimeMillis();
+ while ( it.hasNext() ) {
+ Map.Entry entry = (Map.Entry)it.next();
+ long access = ((Long)entry.getValue()).longValue();
+ if ( (now - access) > timeout ) memberDisappeared((Member)entry.getKey());
+ }
+ }//synch
+ }
+
+ private void memberAlive(Member member) {
+ synchronized (mapMembers) {
+ if (!mapMembers.containsKey(member)) {
+ mapMemberAdded(member);
+ } //end if
+ mapMembers.put(member, new Long(System.currentTimeMillis()));
+ }
+ }
private void broadcast(int msgtype, boolean rpc) throws ChannelException {
//send out a map membership message, only wait for the first reply
@@ -190,21 +224,22 @@
//------------------------------------------------------------------------------
// GROUP COM INTERFACES
//------------------------------------------------------------------------------
- public Member[] getMapMembers() {
- synchronized (mapMembers) {
- Member[] result = new Member[mapMembers.size()];
- mapMembers.toArray(result);
+ public Member[] getMapMembers(HashMap members) {
+ synchronized (members) {
+ Member[] result = new Member[members.size()];
+ members.keySet().toArray(result);
return result;
}
}
+ public Member[] getMapMembers() {
+ return getMapMembers(this.mapMembers);
+ }
public Member[] getMapMembersExcl(Member[] exclude) {
synchronized (mapMembers) {
- ArrayList list = (ArrayList)mapMembers.clone();
+ HashMap list = (HashMap)mapMembers.clone();
for (int i=0; i<exclude.length;i++) list.remove(exclude[i]);
- Member[] result = new Member[list.size()];
- list.toArray(result);
- return result;
+ return getMapMembers(list);
}
}
@@ -375,6 +410,8 @@
mapmsg.deserialize(getExternalLoaders());
if (mapmsg.getMsgType() == MapMessage.MSG_START) {
mapMemberAdded(mapmsg.getBackupNodes()[0]);
+ } else if (mapmsg.getMsgType() == MapMessage.MSG_INIT) {
+ memberAlive(mapmsg.getBackupNodes()[0]);
}
} catch (IOException x ) {
log.error("Unable to deserialize MapMessage.",x);
@@ -469,8 +506,8 @@
boolean memberAdded = false;
//select a backup node if we don't have one
synchronized (mapMembers) {
- if (!mapMembers.contains(member) ) {
- mapMembers.add(member);
+ if (!mapMembers.containsKey(member) ) {
+ mapMembers.put(member, new Long(System.currentTimeMillis()));
memberAdded = true;
}
}
@@ -520,7 +557,7 @@
public void memberDisappeared(Member member) {
boolean removed = false;
synchronized (mapMembers) {
- removed = mapMembers.remove(member);
+ removed = (mapMembers.remove(member) != null );
}
Iterator i = super.entrySet().iterator();
while (i.hasNext()) {
@@ -537,7 +574,6 @@
} //while
}
- protected int currentNode = 0;
public int getNextBackupIndex() {
int size = mapMembers.size();
if (mapMembers.size() == 0)return -1;
@@ -557,6 +593,14 @@
}
protected abstract Member[] publishEntryInfo(Object key, Object value) throws ChannelException;
+
+ public void heartbeat() {
+ try {
+ ping(accessTimeout);
+ }catch ( Exception x ) {
+ log.error("Unable to send AbstractReplicatedMap.ping message",x);
+ }
+ }
//------------------------------------------------------------------------------
// METHODS TO OVERRIDE
@@ -901,6 +945,10 @@
return channelSendOptions;
}
+ public long getAccessTimeout() {
+ return accessTimeout;
+ }
+
public void setMapOwner(Object mapOwner) {
this.mapOwner = mapOwner;
}
@@ -911,6 +959,10 @@
public void setChannelSendOptions(int channelSendOptions) {
this.channelSendOptions = channelSendOptions;
+ }
+
+ public void setAccessTimeout(long accessTimeout) {
+ this.accessTimeout = accessTimeout;
}
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/to-do.txt
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/to-do.txt?rev=407359&r1=407358&r2=407359&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/to-do.txt (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/to-do.txt Wed May 17 12:56:22 2006
@@ -41,21 +41,6 @@
Code Tasks:
===========================================
-46. Heartbeat Interface, to notify listeners as well
-
-44. Soft membership failure detection, ie if a webapp is stopped, but
- the AbstractReplicatedMap doesn't broadcast a stop message
- This is one potential solution:
- 1. keep a static WeakHashMap of all map implementations running
- so that we can share one heartbeat thread for timeouts
- 2. everytime a message is received, update the last check time for that
- member so that we don't need the thread to actively check
- 3. when the thread wakes up, it will check maps that are outside
- the valid range for check time,
- 4. send a RPC message, if no reply, remove the map from itself
- Other solution, use the TcpFailureDetector, catch send errors
-
-
45. McastServiceImpl.receive should have a SO_TIMEOUT so that we can check
for members dropping on the same thread
@@ -286,3 +271,18 @@
component, only the listener
Notes: Completed. added in correct startup sequences.
+46. Heartbeat Interface, to notify listeners as well
+Notes: Implemented
+
+44. Soft membership failure detection, ie if a webapp is stopped, but
+ the AbstractReplicatedMap doesn't broadcast a stop message
+ This is one potential solution:
+ 1. keep a static WeakHashMap of all map implementations running
+ so that we can share one heartbeat thread for timeouts
+ 2. everytime a message is received, update the last check time for that
+ member so that we don't need the thread to actively check
+ 3. when the thread wakes up, it will check maps that are outside
+ the valid range for check time,
+ 4. send a RPC message, if no reply, remove the map from itself
+ Other solution, use the TcpFailureDetector, catch send errors
+Notes: Implemented using a periodic ping in the AbstractReplicatedMap
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org