You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/10/10 18:03:37 UTC

svn commit: r454797 - in /tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes: Channel.java tipis/AbstractReplicatedMap.java

Author: fhanik
Date: Tue Oct 10 09:03:36 2006
New Revision: 454797

URL: http://svn.apache.org/viewvc?view=rev&rev=454797
Log:
Added in documentation

Modified:
    tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/Channel.java
    tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java

Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/Channel.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/Channel.java?view=diff&rev=454797&r1=454796&r2=454797
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/Channel.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/Channel.java Tue Oct 10 09:03:36 2006
@@ -123,7 +123,7 @@
      * Send options, when a message is sent, it can have an option flag
      * to trigger certain behavior. Most flags are used to trigger channel interceptors
      * as the message passes through the channel stack. <br>
-     * However, there are four default flags that every channel implementation must implement<br>
+     * However, there are five default flags that every channel implementation must implement<br>
      * SEND_OPTIONS_BYTE_MESSAGE - The message is a pure byte message and no marshalling or unmarshalling will
      * be performed.<br>
      * 
@@ -136,7 +136,7 @@
      * Send options, when a message is sent, it can have an option flag
      * to trigger certain behavior. Most flags are used to trigger channel interceptors
      * as the message passes through the channel stack. <br>
-     * However, there are four default flags that every channel implementation must implement<br>
+     * However, there are five default flags that every channel implementation must implement<br>
      * SEND_OPTIONS_USE_ACK - Message is sent and an ACK is received when the message has been received by the recipient<br>
      * If no ack is received, the message is not considered successful<br>
      * @see #send(Member[], Serializable , int)
@@ -148,7 +148,7 @@
      * Send options, when a message is sent, it can have an option flag
      * to trigger certain behavior. Most flags are used to trigger channel interceptors
      * as the message passes through the channel stack. <br>
-     * However, there are four default flags that every channel implementation must implement<br>
+     * However, there are five default flags that every channel implementation must implement<br>
      * SEND_OPTIONS_SYNCHRONIZED_ACK - Message is sent and an ACK is received when the message has been received and 
      * processed by the recipient<br>
      * If no ack is received, the message is not considered successful<br>
@@ -161,7 +161,7 @@
      * Send options, when a message is sent, it can have an option flag
      * to trigger certain behavior. Most flags are used to trigger channel interceptors
      * as the message passes through the channel stack. <br>
-     * However, there are four default flags that every channel implementation must implement<br>
+     * However, there are five default flags that every channel implementation must implement<br>
      * SEND_OPTIONS_ASYNCHRONOUS - Message is sent and an ACK is received when the message has been received and 
      * processed by the recipient<br>
      * If no ack is received, the message is not considered successful<br>
@@ -170,12 +170,23 @@
      */
     public static final int SEND_OPTIONS_ASYNCHRONOUS = 0x0008;
     
+    /**
+     * Send options, when a message is sent, it can have an option flag
+     * to trigger certain behavior. Most flags are used to trigger channel interceptors
+     * as the message passes through the channel stack. <br>
+     * However, there are five default flags that every channel implementation must implement<br>
+     * SEND_OPTIONS_SECURE - Message is sent over an encrypted channel<br>
+     * @see #send(Member[], Serializable , int)
+     * @see #send(Member[], Serializable, int, ErrorHandler)
+     */
+    public static final int SEND_OPTIONS_SECURE = 0x0010;
+    
 
     /**
      * Send options, when a message is sent, it can have an option flag
      * to trigger certain behavior. Most flags are used to trigger channel interceptors
      * as the message passes through the channel stack. <br>
-     * However, there are four default flags that every channel implementation must implement<br>
+     * However, there are five default flags that every channel implementation must implement<br>
      * SEND_OPTIONS_DEFAULT - the default sending options, just a helper variable. <br>
      * The default is <code>int SEND_OPTIONS_DEFAULT = SEND_OPTIONS_USE_ACK;</code><br>
      * @see #SEND_OPTIONS_USE_ACK

Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java?view=diff&rev=454797&r1=454796&r2=454797
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java Tue Oct 10 09:03:36 2006
@@ -46,18 +46,9 @@
 import org.apache.juli.logging.Log;
 import org.apache.juli.logging.LogFactory;
 
-import java.util.ConcurrentModificationException;
-
 /**
- * <p>Title: </p>
- *
- * <p>Description: </p>
- *
- * <p>Copyright: Copyright (c) 2005</p>
- *
- * <p>Company: </p>
  *
- * @author not attributable
+ * @author Filip Hanik
  * @version 1.0
  */
 public abstract class AbstractReplicatedMap extends LinkedHashMap implements RpcCallback, ChannelListener, MembershipListener, Heartbeat {
@@ -81,19 +72,68 @@
 //------------------------------------------------------------------------------
 //              INSTANCE VARIABLES
 //------------------------------------------------------------------------------
-    private transient long rpcTimeout = 5000;
-    private transient Channel channel;
-    private transient RpcChannel rpcChannel;
-    private transient byte[] mapContextName;
-    private transient boolean stateTransferred = false;
-    private transient Object stateMutex = new Object();
-    private transient HashMap mapMembers = new HashMap();
-    private transient int channelSendOptions = Channel.SEND_OPTIONS_DEFAULT;
-    private transient Object mapOwner;
-    private transient ClassLoader[] externalLoaders;
+    
+    /**
+     * Timeout for RPC messages, how long we will wait for a reply
+     */
+    protected transient long rpcTimeout = 5000;
+    /**
+     * Reference to the channel for sending messages
+     */
+    protected transient Channel channel;
+    /**
+     * The RpcChannel to send RPC messages through
+     */
+    protected transient RpcChannel rpcChannel;
+    /**
+     * The Map context name makes this map unique, this
+     * allows us to have more than one map shared
+     * through one channel
+     */
+    protected transient byte[] mapContextName;
+    /**
+     * Has the state been transferred
+     */
+    protected transient boolean stateTransferred = false;
+    /**
+     * Simple lock object for transfers
+     */
+    protected transient Object stateMutex = new Object();
+    /**
+     * A list of members in our map
+     */
+    protected transient HashMap mapMembers = new HashMap();
+    /**
+     * Our default send options
+     */
+    protected transient int channelSendOptions = Channel.SEND_OPTIONS_DEFAULT;
+    /**
+     * The owner of this map, ala a SessionManager for example
+     */
+    protected transient Object mapOwner;
+    /**
+     * External class loaders if serialization and deserialization is to be performed successfully.
+     */
+    protected transient ClassLoader[] externalLoaders;
+    
+    /**
+     * The node we are currently backing up data to, this index will rotate
+     * on a round robin basis
+     */
     protected transient int currentNode = 0;
-    private transient long accessTimeout = 5000;
-    private transient String mapname = "";
+    
+    /**
+     * Since the map keeps internal membership
+     * this is the timeout for a ping message to be responded to
+     * If a remote map doesn't respond within this timeframe, 
+     * its considered dead.
+     */
+    protected transient long accessTimeout = 5000;
+    
+    /**
+     * Readable string of the mapContextName value
+     */
+    protected transient String mapname = "";
     
 
 //------------------------------------------------------------------------------
@@ -122,12 +162,27 @@
         
     }
 
+    /**
+     * Helper methods, wraps a single member in an array
+     * @param m Member
+     * @return Member[]
+     */
     protected Member[] wrap(Member m) {
         if ( m == null ) return new Member[0];
         else return new Member[] {m};
     }
 
-    private void init(Object owner, Channel channel, String mapContextName, long timeout, int channelSendOptions,ClassLoader[] cls) {
+    /**
+     * Initializes the map by creating the RPC channel, registering itself as a channel listener
+     * This method is also responsible for initiating the state transfer
+     * @param owner Object
+     * @param channel Channel
+     * @param mapContextName String
+     * @param timeout long
+     * @param channelSendOptions int
+     * @param cls ClassLoader[]
+     */
+    protected void init(Object owner, Channel channel, String mapContextName, long timeout, int channelSendOptions,ClassLoader[] cls) {
         log.info("Initializing AbstractReplicatedMap with context name:"+mapContextName);
         this.mapOwner = owner;
         this.externalLoaders = cls;
@@ -147,32 +202,52 @@
 
         //create an rpc channel and add the map as a listener
         this.rpcChannel = new RpcChannel(this.mapContextName, channel, this);
+        //add this map as a message listener
         this.channel.addChannelListener(this);
+        //listen for membership notifications
         this.channel.addMembershipListener(this);
         
         
         try {
+            //broadcast our map, this just notifies other members of our existence
             broadcast(MapMessage.MSG_INIT, true);
             //transfer state from another map
             transferState();
+            //state is transferred, we are ready for messaging
             broadcast(MapMessage.MSG_START, true);
         } catch (ChannelException x) {
             log.warn("Unable to send map start message.");
             throw new RuntimeException("Unable to start replicated map.",x);
         }
-
     }
     
     
-    private void ping(long timeout) throws ChannelException {
+    /**
+     * Sends a ping out to all the members in the cluster, not just map members
+     * that this map is alive.
+     * @param timeout long
+     * @throws ChannelException
+     */
+    protected void ping(long timeout) throws ChannelException {
         //send out a map membership message, only wait for the first reply
-        MapMessage msg = new MapMessage(this.mapContextName, MapMessage.MSG_INIT,
-                                        false, null, null, null, wrap(channel.getLocalMember(false)));
-        Response[] resp = rpcChannel.send(channel.getMembers(), msg, rpcChannel.ALL_REPLY, (channelSendOptions), (int)accessTimeout);
-        for (int i = 0; i < resp.length; i++) {
-            memberAlive(resp[i].getSource());
-        }//for
-
+        MapMessage msg = new MapMessage(this.mapContextName, 
+                                        MapMessage.MSG_INIT,
+                                        false, 
+                                        null, 
+                                        null, 
+                                        null, 
+                                        wrap(channel.getLocalMember(false)));
+        if ( channel.getMembers().length > 0 ) {
+            //send a ping, wait for all nodes to reply
+            Response[] resp = rpcChannel.send(channel.getMembers(), 
+                                              msg, rpcChannel.ALL_REPLY, 
+                                              (channelSendOptions),
+                                              (int) accessTimeout);
+            for (int i = 0; i < resp.length; i++) {
+                memberAlive(resp[i].getSource());
+            } //for
+        }
+        //update our map of members, expire some if we didn't receive a ping back
         synchronized (mapMembers) {
             Iterator it = mapMembers.entrySet().iterator();
             long now = System.currentTimeMillis();
@@ -184,7 +259,11 @@
         }//synch
     }
 
-    private void memberAlive(Member member) {
+    /**
+     * We have received a member alive notification
+     * @param member Member
+     */
+    protected void memberAlive(Member member) {
         synchronized (mapMembers) {
             if (!mapMembers.containsKey(member)) {
                 mapMemberAdded(member);
@@ -192,8 +271,14 @@
             mapMembers.put(member, new Long(System.currentTimeMillis()));
         }
     }
-
-    private void broadcast(int msgtype, boolean rpc) throws ChannelException {
+    
+    /**
+     * Helper method to broadcast a message to all members in a channel
+     * @param msgtype int
+     * @param rpc boolean
+     * @throws ChannelException
+     */
+    protected void broadcast(int msgtype, boolean rpc) throws ChannelException {
         //send out a map membership message, only wait for the first reply
         MapMessage msg = new MapMessage(this.mapContextName, msgtype,
                                         false, null, null, null, wrap(channel.getLocalMember(false)));



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org