You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/10/10 18:03:37 UTC
svn commit: r454797 - in
/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes: Channel.java
tipis/AbstractReplicatedMap.java
Author: fhanik
Date: Tue Oct 10 09:03:36 2006
New Revision: 454797
URL: http://svn.apache.org/viewvc?view=rev&rev=454797
Log:
Added in documentation
Modified:
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/Channel.java
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/Channel.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/Channel.java?view=diff&rev=454797&r1=454796&r2=454797
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/Channel.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/Channel.java Tue Oct 10 09:03:36 2006
@@ -123,7 +123,7 @@
* Send options, when a message is sent, it can have an option flag
* to trigger certain behavior. Most flags are used to trigger channel interceptors
* as the message passes through the channel stack. <br>
- * However, there are four default flags that every channel implementation must implement<br>
+ * However, there are five default flags that every channel implementation must implement<br>
* SEND_OPTIONS_BYTE_MESSAGE - The message is a pure byte message and no marshalling or unmarshalling will
* be performed.<br>
*
@@ -136,7 +136,7 @@
* Send options, when a message is sent, it can have an option flag
* to trigger certain behavior. Most flags are used to trigger channel interceptors
* as the message passes through the channel stack. <br>
- * However, there are four default flags that every channel implementation must implement<br>
+ * However, there are five default flags that every channel implementation must implement<br>
* SEND_OPTIONS_USE_ACK - Message is sent and an ACK is received when the message has been received by the recipient<br>
* If no ack is received, the message is not considered successful<br>
* @see #send(Member[], Serializable , int)
@@ -148,7 +148,7 @@
* Send options, when a message is sent, it can have an option flag
* to trigger certain behavior. Most flags are used to trigger channel interceptors
* as the message passes through the channel stack. <br>
- * However, there are four default flags that every channel implementation must implement<br>
+ * However, there are five default flags that every channel implementation must implement<br>
* SEND_OPTIONS_SYNCHRONIZED_ACK - Message is sent and an ACK is received when the message has been received and
* processed by the recipient<br>
* If no ack is received, the message is not considered successful<br>
@@ -161,7 +161,7 @@
* Send options, when a message is sent, it can have an option flag
* to trigger certain behavior. Most flags are used to trigger channel interceptors
* as the message passes through the channel stack. <br>
- * However, there are four default flags that every channel implementation must implement<br>
+ * However, there are five default flags that every channel implementation must implement<br>
* SEND_OPTIONS_ASYNCHRONOUS - Message is sent and an ACK is received when the message has been received and
* processed by the recipient<br>
* If no ack is received, the message is not considered successful<br>
@@ -170,12 +170,23 @@
*/
public static final int SEND_OPTIONS_ASYNCHRONOUS = 0x0008;
+ /**
+ * Send options, when a message is sent, it can have an option flag
+ * to trigger certain behavior. Most flags are used to trigger channel interceptors
+ * as the message passes through the channel stack. <br>
+ * However, there are five default flags that every channel implementation must implement<br>
+ * SEND_OPTIONS_SECURE - Message is sent over an encrypted channel<br>
+ * @see #send(Member[], Serializable , int)
+ * @see #send(Member[], Serializable, int, ErrorHandler)
+ */
+ public static final int SEND_OPTIONS_SECURE = 0x0010;
+
/**
* Send options, when a message is sent, it can have an option flag
* to trigger certain behavior. Most flags are used to trigger channel interceptors
* as the message passes through the channel stack. <br>
- * However, there are four default flags that every channel implementation must implement<br>
+ * However, there are five default flags that every channel implementation must implement<br>
* SEND_OPTIONS_DEFAULT - the default sending options, just a helper variable. <br>
* The default is <code>int SEND_OPTIONS_DEFAULT = SEND_OPTIONS_USE_ACK;</code><br>
* @see #SEND_OPTIONS_USE_ACK
Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java?view=diff&rev=454797&r1=454796&r2=454797
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java Tue Oct 10 09:03:36 2006
@@ -46,18 +46,9 @@
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
-import java.util.ConcurrentModificationException;
-
/**
- * <p>Title: </p>
- *
- * <p>Description: </p>
- *
- * <p>Copyright: Copyright (c) 2005</p>
- *
- * <p>Company: </p>
*
- * @author not attributable
+ * @author Filip Hanik
* @version 1.0
*/
public abstract class AbstractReplicatedMap extends LinkedHashMap implements RpcCallback, ChannelListener, MembershipListener, Heartbeat {
@@ -81,19 +72,68 @@
//------------------------------------------------------------------------------
// INSTANCE VARIABLES
//------------------------------------------------------------------------------
- private transient long rpcTimeout = 5000;
- private transient Channel channel;
- private transient RpcChannel rpcChannel;
- private transient byte[] mapContextName;
- private transient boolean stateTransferred = false;
- private transient Object stateMutex = new Object();
- private transient HashMap mapMembers = new HashMap();
- private transient int channelSendOptions = Channel.SEND_OPTIONS_DEFAULT;
- private transient Object mapOwner;
- private transient ClassLoader[] externalLoaders;
+
+ /**
+ * Timeout for RPC messages, how long we will wait for a reply
+ */
+ protected transient long rpcTimeout = 5000;
+ /**
+ * Reference to the channel for sending messages
+ */
+ protected transient Channel channel;
+ /**
+ * The RpcChannel to send RPC messages through
+ */
+ protected transient RpcChannel rpcChannel;
+ /**
+ * The Map context name makes this map unique, this
+ * allows us to have more than one map shared
+ * through one channel
+ */
+ protected transient byte[] mapContextName;
+ /**
+ * Has the state been transferred
+ */
+ protected transient boolean stateTransferred = false;
+ /**
+ * Simple lock object for transfers
+ */
+ protected transient Object stateMutex = new Object();
+ /**
+ * A list of members in our map
+ */
+ protected transient HashMap mapMembers = new HashMap();
+ /**
+ * Our default send options
+ */
+ protected transient int channelSendOptions = Channel.SEND_OPTIONS_DEFAULT;
+ /**
+ * The owner of this map, ala a SessionManager for example
+ */
+ protected transient Object mapOwner;
+ /**
+ * External class loaders if serialization and deserialization is to be performed successfully.
+ */
+ protected transient ClassLoader[] externalLoaders;
+
+ /**
+ * The node we are currently backing up data to, this index will rotate
+ * on a round robin basis
+ */
protected transient int currentNode = 0;
- private transient long accessTimeout = 5000;
- private transient String mapname = "";
+
+ /**
+ * Since the map keeps internal membership
+ * this is the timeout for a ping message to be responded to
+ * If a remote map doesn't respond within this timeframe,
+ * its considered dead.
+ */
+ protected transient long accessTimeout = 5000;
+
+ /**
+ * Readable string of the mapContextName value
+ */
+ protected transient String mapname = "";
//------------------------------------------------------------------------------
@@ -122,12 +162,27 @@
}
+ /**
+ * Helper methods, wraps a single member in an array
+ * @param m Member
+ * @return Member[]
+ */
protected Member[] wrap(Member m) {
if ( m == null ) return new Member[0];
else return new Member[] {m};
}
- private void init(Object owner, Channel channel, String mapContextName, long timeout, int channelSendOptions,ClassLoader[] cls) {
+ /**
+ * Initializes the map by creating the RPC channel, registering itself as a channel listener
+ * This method is also responsible for initiating the state transfer
+ * @param owner Object
+ * @param channel Channel
+ * @param mapContextName String
+ * @param timeout long
+ * @param channelSendOptions int
+ * @param cls ClassLoader[]
+ */
+ protected void init(Object owner, Channel channel, String mapContextName, long timeout, int channelSendOptions,ClassLoader[] cls) {
log.info("Initializing AbstractReplicatedMap with context name:"+mapContextName);
this.mapOwner = owner;
this.externalLoaders = cls;
@@ -147,32 +202,52 @@
//create an rpc channel and add the map as a listener
this.rpcChannel = new RpcChannel(this.mapContextName, channel, this);
+ //add this map as a message listener
this.channel.addChannelListener(this);
+ //listen for membership notifications
this.channel.addMembershipListener(this);
try {
+ //broadcast our map, this just notifies other members of our existence
broadcast(MapMessage.MSG_INIT, true);
//transfer state from another map
transferState();
+ //state is transferred, we are ready for messaging
broadcast(MapMessage.MSG_START, true);
} catch (ChannelException x) {
log.warn("Unable to send map start message.");
throw new RuntimeException("Unable to start replicated map.",x);
}
-
}
- private void ping(long timeout) throws ChannelException {
+ /**
+ * Sends a ping out to all the members in the cluster, not just map members
+ * that this map is alive.
+ * @param timeout long
+ * @throws ChannelException
+ */
+ protected void ping(long timeout) throws ChannelException {
//send out a map membership message, only wait for the first reply
- MapMessage msg = new MapMessage(this.mapContextName, MapMessage.MSG_INIT,
- false, null, null, null, wrap(channel.getLocalMember(false)));
- Response[] resp = rpcChannel.send(channel.getMembers(), msg, rpcChannel.ALL_REPLY, (channelSendOptions), (int)accessTimeout);
- for (int i = 0; i < resp.length; i++) {
- memberAlive(resp[i].getSource());
- }//for
-
+ MapMessage msg = new MapMessage(this.mapContextName,
+ MapMessage.MSG_INIT,
+ false,
+ null,
+ null,
+ null,
+ wrap(channel.getLocalMember(false)));
+ if ( channel.getMembers().length > 0 ) {
+ //send a ping, wait for all nodes to reply
+ Response[] resp = rpcChannel.send(channel.getMembers(),
+ msg, rpcChannel.ALL_REPLY,
+ (channelSendOptions),
+ (int) accessTimeout);
+ for (int i = 0; i < resp.length; i++) {
+ memberAlive(resp[i].getSource());
+ } //for
+ }
+ //update our map of members, expire some if we didn't receive a ping back
synchronized (mapMembers) {
Iterator it = mapMembers.entrySet().iterator();
long now = System.currentTimeMillis();
@@ -184,7 +259,11 @@
}//synch
}
- private void memberAlive(Member member) {
+ /**
+ * We have received a member alive notification
+ * @param member Member
+ */
+ protected void memberAlive(Member member) {
synchronized (mapMembers) {
if (!mapMembers.containsKey(member)) {
mapMemberAdded(member);
@@ -192,8 +271,14 @@
mapMembers.put(member, new Long(System.currentTimeMillis()));
}
}
-
- private void broadcast(int msgtype, boolean rpc) throws ChannelException {
+
+ /**
+ * Helper method to broadcast a message to all members in a channel
+ * @param msgtype int
+ * @param rpc boolean
+ * @throws ChannelException
+ */
+ protected void broadcast(int msgtype, boolean rpc) throws ChannelException {
//send out a map membership message, only wait for the first reply
MapMessage msg = new MapMessage(this.mapContextName, msgtype,
false, null, null, null, wrap(channel.getLocalMember(false)));
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org