You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/03/22 02:57:43 UTC

svn commit: r387686 - in /tomcat/container/tc5.5.x/modules: groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ groupcom/src/share/org/apache/catalina/tribes/tipis/ groupcom/test/org/apache/catalina/tribes/demos/ ha/src/share/org/apache/catalina/ha/...

Author: fhanik
Date: Tue Mar 21 17:57:39 2006
New Revision: 387686

URL: http://svn.apache.org/viewcvs?rev=387686&view=rev
Log:
Initial version of the backup manager, still super buggy, need to figure out a way on how to deserialize sessions.

Added:
    tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/BackupManager.java
Modified:
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java
    tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java
    tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/CatalinaCluster.java
    tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/DeltaSession.java
    tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/SimpleTcpReplicationManager.java
    tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/tcp/SimpleTcpCluster.java

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java?rev=387686&r1=387685&r2=387686&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java Tue Mar 21 17:57:39 2006
@@ -216,11 +216,12 @@
             setConnected(false);
             if ( socketChannel != null ) {
                 try {
-                    Socket socket = socketChannel.socket();
+                    Socket socket = null;
+                    //socket = socketChannel.socket();
                     //error free close, all the way
-                    try {socket.shutdownOutput();}catch ( Exception x){}
-                    try {socket.shutdownInput();}catch ( Exception x){}
-                    try {socket.close();}catch ( Exception x){}
+                    //try {socket.shutdownOutput();}catch ( Exception x){}
+                    //try {socket.shutdownInput();}catch ( Exception x){}
+                    //try {socket.close();}catch ( Exception x){}
                     try {socketChannel.close();}catch ( Exception x){}
                     socket = null;
                 }finally {

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java?rev=387686&r1=387685&r2=387686&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java Tue Mar 21 17:57:39 2006
@@ -64,6 +64,11 @@
      * The load factor used when none specified in constructor.
      **/
     public static final float DEFAULT_LOAD_FACTOR = 0.75f;
+    
+    /**
+     * Used to identify the map
+     */
+    final String chset = "ISO-8859-1";
 
 //------------------------------------------------------------------------------
 //              INSTANCE VARIABLES
@@ -76,7 +81,7 @@
     private transient Object stateMutex = new Object();
     private transient ArrayList mapMembers = new ArrayList();
     private transient int channelSendOptions = Channel.SEND_OPTIONS_DEFAULT;
-    private transient Object mapOwner;
+    private transient MapOwner mapOwner;
 
 //------------------------------------------------------------------------------
 //              CONSTRUCTORS
@@ -90,7 +95,7 @@
      * @param initialCapacity int - the size of this map, see HashMap
      * @param loadFactor float - load factor, see HashMap
      */
-    public AbstractReplicatedMap(Object owner,
+    public AbstractReplicatedMap(MapOwner owner,
                                  Channel channel, 
                                  long timeout, 
                                  String mapContextName, 
@@ -106,9 +111,9 @@
         return new Member[] {m};
     }
 
-    private void init(Object owner, Channel channel, String mapContextName, long timeout, int channelSendOptions) {
+    private void init(MapOwner owner, Channel channel, String mapContextName, long timeout, int channelSendOptions) {
         this.mapOwner = owner;
-        final String chset = "ISO-8859-1";
+        
         this.channelSendOptions = channelSendOptions;
         this.channel = channel;
         this.rpcTimeout = timeout;
@@ -132,6 +137,7 @@
                                             false, null, null, null, wrap(channel.getLocalMember(false)));
             Response[] resp = rpcChannel.send(channel.getMembers(), msg, rpcChannel.FIRST_REPLY, channelSendOptions, timeout);
             for (int i = 0; i < resp.length; i++) {
+                mapMemberAdded(resp[i].getSource());
                 messageReceived(resp[i].getMessage(), resp[i].getSource());
             }
         } catch (ChannelException x) {
@@ -140,6 +146,7 @@
 
         //transfer state from another map
         transferState();
+        printMap();
     }
 
     public void breakdown() {
@@ -305,6 +312,7 @@
 
         //backup request
         if (mapmsg.getMsgType() == mapmsg.MSG_RETRIEVE_BACKUP) {
+            System.out.println("Received a retrieve request for id:"+mapmsg.getKey());
             MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
             if (entry == null)return null;
             mapmsg.setValue( (Serializable) entry.getValue());
@@ -377,6 +385,7 @@
         }
 
         if (mapmsg.getMsgType() == MapMessage.MSG_BACKUP) {
+            System.out.println("Received a backup request for id:"+mapmsg.getKey());
             MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
             if (entry == null) {
                 entry = new MapEntry(mapmsg.getKey(), mapmsg.getValue());
@@ -386,7 +395,6 @@
                 if (mapmsg.getValue()!=null && mapmsg.getValue() instanceof ReplicatedMapEntry ) {
                     ((ReplicatedMapEntry)mapmsg.getValue()).setOwner(getMapOwner());
                 }
-                super.put(entry.getKey(), entry);
             } else {
                 entry.setBackup(true);
                 entry.setProxy(false);
@@ -414,8 +422,9 @@
                     entry.setValue(mapmsg.getValue());
                 } //end if
             } //end if
+            super.put(entry.getKey(), entry);
         } //end if
-
+        printMap();
     }
 
     public boolean accept(Serializable msg, Member sender) {
@@ -426,10 +435,15 @@
     }
 
     public void mapMemberAdded(Member member) {
+        System.out.println("Received Member added:"+member.getName());
+        if ( member.equals(getChannel().getLocalMember(false)) ) return;        
+        System.out.println("Received Member added2:"+member.getName());
         //select a backup node if we don't have one
         synchronized (mapMembers) {
             if (!mapMembers.contains(member) ) mapMembers.add(member);
         }
+        System.out.println("Received Member added3:"+member.getName());
+        printMap();
         synchronized (stateMutex) {
             Iterator i = super.entrySet().iterator();
             while (i.hasNext()) {
@@ -460,6 +474,8 @@
     }
 
     public void memberDisappeared(Member member) {
+        Exception ex = new Exception("[DEBUG] Removing member:"+member.getName());
+        ex.printStackTrace();
         synchronized (mapMembers) {
             mapMembers.remove(member);
         }
@@ -498,13 +514,41 @@
 //              METHODS TO OVERRIDE    
 //------------------------------------------------------------------------------
   
+    protected void printMap() {
+        try {
+            System.out.println("\nMap["+((Object)this).toString()+"; " + new String(mapContextName, chset) + ", Map Size:" + super.size());
+            Member[] mbrs = getMapMembers();
+            for ( int i=0; i<mbrs.length;i++ ) {
+                System.out.println("Mbr["+(i+1)+"="+mbrs[i].getName());
+            }
+            Iterator i = super.entrySet().iterator();
+            int cnt = 0;
 
+            while (i.hasNext()) {
+                Map.Entry e = (Map.Entry) i.next();
+                System.out.println( (++cnt) + ". " + e.getValue());
+            }
+            System.out.println("EndMap]\n\n");
+        }catch ( Exception ignore) {
+            ignore.printStackTrace();
+        }
+    }
+
+//------------------------------------------------------------------------------
+//                Map Owner - serialization/deserialization
+//------------------------------------------------------------------------------
+    public static interface MapOwner {
+        
+        public byte[] serialize(Object mapObject) throws IOException;
+        
+        public Serializable deserialize(byte[] data) throws ClassNotFoundException,IOException;
+        
+    }
 
 //------------------------------------------------------------------------------
 //                Map Entry class
 //------------------------------------------------------------------------------
-    public static class MapEntry
-        implements Map.Entry {
+    public static class MapEntry implements Map.Entry {
         private boolean backup;
         private boolean proxy;
         private Member[] backupNodes;
@@ -564,29 +608,12 @@
             return key;
         }
 
-        public byte[] getDiff() throws IOException {
-            if (isDiffable()) {
-                return ( (ReplicatedMapEntry) value).getDiff();
-            } else {
-                return getData();
-            }
-        }
-
         public int hashCode() {
-            return value.hashCode();
+            return key.hashCode();
         }
 
         public boolean equals(Object o) {
-            return value.equals(o);
-        }
-
-        /**
-         * returns the entire object as a byte array
-         * @return byte[]
-         * @throws IOException
-         */
-        public byte[] getData() throws IOException {
-            return (new ObjectStreamable(value)).getBuf().getArray();
+            return key.equals(o);
         }
 
         /**
@@ -614,6 +641,16 @@
                 value = XByteBuffer.deserialize(data, offset, length);
             }
         }
+        
+        public String toString() {
+            StringBuffer buf = new StringBuffer("MapEntry[key:");
+            buf.append(getKey()).append("; ");
+            buf.append("value:").append(getValue()).append("; ");
+            buf.append("primary:").append(isPrimary()).append("; ");
+            buf.append("backup:").append(isBackup()).append("; ");
+            buf.append("proxy:").append(isProxy()).append(";]");
+            return buf.toString();
+        }
 
     }
 
@@ -640,8 +677,7 @@
 
         public MapMessage() {}
 
-        public MapMessage(byte[] mapId,
-                          int msgtype, boolean diff,
+        public MapMessage(byte[] mapId,int msgtype, boolean diff,
                           Serializable key, Serializable value,
                           byte[] diffvalue, Member[] nodes) {
             this.mapId = mapId;
@@ -751,9 +787,8 @@
                     out.write(d);
                 }
             }
-            
         }
-
+        
         public void writeExternal(ObjectOutput out) throws IOException {
             out.writeInt(mapId.length);
             out.write(mapId);
@@ -803,60 +838,6 @@
         }
     } //MapMessage
 
-//------------------------------------------------------------------------------
-//                streamable class
-//------------------------------------------------------------------------------
-
-    public static class ObjectStreamable
-        implements Streamable {
-        private DirectByteArrayOutputStream buf;
-        private int pos = 0;
-        public ObjectStreamable(Serializable value) throws IOException {
-            buf = new DirectByteArrayOutputStream(1024);
-            ObjectOutputStream out = new ObjectOutputStream(buf);
-            out.writeObject(value);
-            out.flush();
-        }
-
-        /**
-         * returns true if the stream has reached its end
-         * @return boolean
-         */
-        public synchronized boolean eof() {
-            return (pos >= buf.size());
-
-        }
-
-        /**
-         * write data into the byte array starting at offset, maximum bytes read are (data.length-offset)
-         * @param data byte[] - the array to read data into
-         * @param offset int - start position for writing data
-         * @return int - the number of bytes written into the data buffer
-         */
-        public synchronized int write(byte[] data, int offset) throws IOException {
-            int length = Math.min(data.length - offset, buf.size() - pos);
-            System.arraycopy(buf.getArrayDirect(), pos, data, offset, length);
-            pos = pos + length;
-            return length;
-        }
-
-        public synchronized int read(byte[] data, int offset, int length) throws IOException {
-            return -1;
-        }
-
-        public DirectByteArrayOutputStream getBuf() {
-            return buf;
-        }
-
-        public int size() {
-            return buf.size();
-        }
-
-        public int pos() {
-            return pos;
-        }
-
-    }
 
     public Channel getChannel() {
         return channel;
@@ -886,7 +867,7 @@
         return mapOwner;
     }
 
-    public void setMapOwner(Object mapOwner) {
+    public void setMapOwner(MapOwner mapOwner) {
         this.mapOwner = mapOwner;
     }
 

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?rev=387686&r1=387685&r2=387686&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java Tue Mar 21 17:57:39 2006
@@ -29,6 +29,7 @@
 import org.apache.catalina.tribes.ChannelListener;
 import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.MembershipListener;
+import org.apache.catalina.tribes.tipis.AbstractReplicatedMap.MapOwner;
 
 /**
  * A smart implementation of a stateful replicated map. uses primary/secondary backup strategy. 
@@ -84,7 +85,7 @@
          * @param initialCapacity int - the size of this map, see HashMap
          * @param loadFactor float - load factor, see HashMap
          */
-        public LazyReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName, int initialCapacity, float loadFactor) {
+        public LazyReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName, int initialCapacity, float loadFactor) {
             super(owner,channel,timeout,mapContextName,initialCapacity,loadFactor, Channel.SEND_OPTIONS_DEFAULT);
         }
 
@@ -95,7 +96,7 @@
          * @param mapContextName String - unique name for this map, to allow multiple maps per channel
          * @param initialCapacity int - the size of this map, see HashMap
          */
-        public LazyReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName, int initialCapacity) {
+        public LazyReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName, int initialCapacity) {
             super(owner, channel,timeout,mapContextName,initialCapacity, LazyReplicatedMap.DEFAULT_LOAD_FACTOR, Channel.SEND_OPTIONS_DEFAULT);
         }
 
@@ -105,7 +106,7 @@
          * @param timeout long - timeout for RPC messags
          * @param mapContextName String - unique name for this map, to allow multiple maps per channel
          */
-        public LazyReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName) {
+        public LazyReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName) {
             super(owner, channel,timeout,mapContextName, LazyReplicatedMap.DEFAULT_INITIAL_CAPACITY,LazyReplicatedMap.DEFAULT_LOAD_FACTOR,Channel.SEND_OPTIONS_DEFAULT);
         }
 
@@ -142,29 +143,39 @@
     }
     
     public Object get(Object key) {
+        System.out.println("Getting session id:"+key);
+        printMap();
         MapEntry entry = (MapEntry)super.get(key);
         if ( entry == null ) return null;
         if ( !entry.isPrimary() ) {
             //if the message is not primary, we need to retrieve the latest value
             try {
-                MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_RETRIEVE_BACKUP, false,
-                                                (Serializable) key, null, null, null);
-                Response[] resp = getRpcChannel().send(entry.getBackupNodes(),msg, this.getRpcChannel().FIRST_REPLY, Channel.SEND_OPTIONS_DEFAULT, getRpcTimeout());
-                if (resp == null || resp.length == 0) {
-                    //no responses
-                    log.warn("Unable to retrieve remote object for key:" + key);
-                    return null;
-                }
-                msg = (MapMessage) resp[0].getMessage();
                 
-                Member[] backup = entry.getBackupNodes();
-                if ( entry.getValue() instanceof ReplicatedMapEntry ) {
-                    ReplicatedMapEntry val = (ReplicatedMapEntry)entry.getValue();
-                    val.setOwner(getMapOwner());
+                Member[] backup = null;
+                MapMessage msg = null;
+                if ( !entry.isBackup() ) {
+                    //make sure we don't retrieve from ourselves
+                    System.out.println("Retrieving from remote session id:"+key);
+                    msg = new MapMessage(getMapContextName(), MapMessage.MSG_RETRIEVE_BACKUP, false,
+                                         (Serializable) key, null, null, null);
+                    Response[] resp = getRpcChannel().send(entry.getBackupNodes(),msg, this.getRpcChannel().FIRST_REPLY, Channel.SEND_OPTIONS_DEFAULT, getRpcTimeout());
+                    if (resp == null || resp.length == 0) {
+                        //no responses
+                        log.warn("Unable to retrieve remote object for key:" + key);
+                        return null;
+                    }
+                    msg = (MapMessage) resp[0].getMessage();
+
+                    backup = entry.getBackupNodes();
+                    if ( entry.getValue() instanceof ReplicatedMapEntry ) {
+                        ReplicatedMapEntry val = (ReplicatedMapEntry)entry.getValue();
+                        val.setOwner(getMapOwner());
+                    }
+                    entry.setValue(msg.getValue());
                 }
                 if (entry.isBackup()) {
                     //select a new backup node
-                    backup = publishEntryInfo(key, msg.getValue());
+                    backup = publishEntryInfo(key, entry.getValue());
                 } else if ( entry.isProxy() ) {
                     //invalidate the previous primary
                     msg = new MapMessage(getMapContextName(),MapMessage.MSG_PROXY,false,(Serializable)key,null,null,backup);
@@ -174,7 +185,7 @@
                 entry.setBackupNodes(backup);
                 entry.setBackup(false);
                 entry.setProxy(false);
-                entry.setValue(msg.getValue());
+
                 
             } catch (ChannelException x) {
                 log.error("Unable to replicate out data for a LazyReplicatedMap.get operation", x);
@@ -197,6 +208,7 @@
 
     
     public Object put(Object key, Object value) {
+        System.out.println("Adding session id:"+key);
         if ( !(key instanceof Serializable) ) throw new IllegalArgumentException("Key is not serializable:"+key.getClass().getName());
         if ( value == null ) return remove(key);
         if ( !(value instanceof Serializable) ) throw new IllegalArgumentException("Value is not serializable:"+value.getClass().getName());
@@ -216,6 +228,7 @@
             log.error("Unable to replicate out data for a LazyReplicatedMap.put operation", x);
         }
         super.put(key,entry);
+        printMap();
         return old;
     }
 
@@ -293,7 +306,7 @@
         while ( i.hasNext() ) {
             Map.Entry e = (Map.Entry)i.next();
             MapEntry entry = (MapEntry)e.getValue();
-            if ( entry.isPrimary() ) set.add(entry.getValue());
+            if ( entry.isPrimary() ) set.add(entry);
         }
         return Collections.unmodifiableSet(set);
     }
@@ -342,7 +355,7 @@
         while ( i.hasNext() ) {
             Map.Entry e = (Map.Entry)i.next();
             MapEntry entry = (MapEntry)e.getValue();
-            if ( entry.isPrimary() ) values.add(entry.getValue());
+            if ( entry.isPrimary() && entry.getValue()!=null) values.add(entry.getValue());
         }
         return Collections.unmodifiableCollection(values);
     }

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java?rev=387686&r1=387685&r2=387686&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java Tue Mar 21 17:57:39 2006
@@ -29,6 +29,7 @@
 import org.apache.catalina.tribes.ChannelListener;
 import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.MembershipListener;
+import org.apache.catalina.tribes.tipis.AbstractReplicatedMap.MapOwner;
 
 /**
  * All-to-all replication for a hash map implementation. Each node in the cluster will carry an identical 
@@ -66,7 +67,7 @@
      * @param initialCapacity int - the size of this map, see HashMap
      * @param loadFactor float - load factor, see HashMap
      */
-    public ReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName, int initialCapacity,float loadFactor) {
+    public ReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName, int initialCapacity,float loadFactor) {
         super(owner,channel, timeout, mapContextName, initialCapacity, loadFactor, Channel.SEND_OPTIONS_DEFAULT);
     }
 
@@ -77,7 +78,7 @@
      * @param mapContextName String - unique name for this map, to allow multiple maps per channel
      * @param initialCapacity int - the size of this map, see HashMap
      */
-    public ReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName, int initialCapacity) {
+    public ReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName, int initialCapacity) {
         super(owner,channel, timeout, mapContextName, initialCapacity, AbstractReplicatedMap.DEFAULT_LOAD_FACTOR,Channel.SEND_OPTIONS_DEFAULT);
     }
 
@@ -87,7 +88,7 @@
      * @param timeout long - timeout for RPC messags
      * @param mapContextName String - unique name for this map, to allow multiple maps per channel
      */
-    public ReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName) {
+    public ReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName) {
         super(owner, channel, timeout, mapContextName,AbstractReplicatedMap.DEFAULT_INITIAL_CAPACITY, AbstractReplicatedMap.DEFAULT_LOAD_FACTOR, Channel.SEND_OPTIONS_DEFAULT);
     }
 

Modified: tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java?rev=387686&r1=387685&r2=387686&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java Tue Mar 21 17:57:39 2006
@@ -45,7 +45,7 @@
     protected SimpleTableDemo table;
     
     public MapDemo(Channel channel ) {
-        map = new LazyReplicatedMap(this,channel,5000, "MapDemo");
+        map = new LazyReplicatedMap(null,channel,5000, "MapDemo");
         table = SimpleTableDemo.createAndShowGUI(map,channel.getLocalMember(false).getName());
         channel.addChannelListener(this);
         channel.addMembershipListener(this);

Modified: tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/CatalinaCluster.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/CatalinaCluster.java?rev=387686&r1=387685&r2=387686&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/CatalinaCluster.java (original)
+++ tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/CatalinaCluster.java Tue Mar 21 17:57:39 2006
@@ -123,6 +123,7 @@
     public Manager getManager(String name);
     public void removeManager(String name,Manager manager);
     public void addManager(String name,Manager manager);
+    public String getManagerName(String name, Manager manager);
     public Valve[] getValves();
     
     public void setChannel(Channel channel);

Added: tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/BackupManager.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/BackupManager.java?rev=387686&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/BackupManager.java (added)
+++ tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/BackupManager.java Tue Mar 21 17:57:39 2006
@@ -0,0 +1,265 @@
+/*
+ * Copyright 1999,2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.ha.session;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+
+import org.apache.catalina.LifecycleException;
+import org.apache.catalina.Loader;
+import org.apache.catalina.Session;
+import org.apache.catalina.ha.CatalinaCluster;
+import org.apache.catalina.ha.ClusterManager;
+import org.apache.catalina.ha.ClusterMessage;
+import org.apache.catalina.session.StandardManager;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.io.ReplicationStream;
+import org.apache.catalina.tribes.tipis.LazyReplicatedMap;
+
+/**
+ *@author Filip Hanik
+ *@version 1.0
+ */
+public class BackupManager extends StandardManager implements ClusterManager
+{
+    public static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog( BackupManager.class );
+
+    protected static long DEFAULT_REPL_TIMEOUT = 15000;//15 seconds
+
+    /** Set to true if we don't want the sessions to expire on shutdown */
+    protected boolean mExpireSessionsOnShutdown = true;
+    
+    /**
+     * The name of this manager
+     */
+    protected String name;
+
+    /**
+     * A reference to the cluster
+     */
+    protected CatalinaCluster cluster;
+    
+    /**
+     * Should listeners be notified?
+     */
+    private boolean notifyListenersOnReplication;
+
+
+    /**
+     * Constructor, just calls super()
+     *
+     */
+    public BackupManager() {
+        super();
+    }
+
+
+//******************************************************************************/
+//      ClusterManager Interface     
+//******************************************************************************/
+
+    public void messageDataReceived(ClusterMessage msg) {
+    }
+
+    public boolean isSendClusterDomainOnly() {
+        return false;
+    }
+
+    /**
+     * @param sendClusterDomainOnly The sendClusterDomainOnly to set.
+     */
+    public void setSendClusterDomainOnly(boolean sendClusterDomainOnly) {
+    }
+
+    /**
+     * @return Returns the defaultMode.
+     */
+    public boolean isDefaultMode() {
+        return false;
+    }
+    /**
+     * @param defaultMode The defaultMode to set.
+     */
+    public void setDefaultMode(boolean defaultMode) {
+    }
+
+    public void setExpireSessionsOnShutdown(boolean expireSessionsOnShutdown)
+    {
+        mExpireSessionsOnShutdown = expireSessionsOnShutdown;
+    }
+
+    public void setCluster(CatalinaCluster cluster) {
+        if(log.isDebugEnabled())
+            log.debug("Cluster associated with SimpleTcpReplicationManager");
+        this.cluster = cluster;
+    }
+
+    public boolean getExpireSessionsOnShutdown()
+    {
+        return mExpireSessionsOnShutdown;
+    }
+
+
+    /**
+     * Override persistence since they don't go hand in hand with replication for now.
+     */
+    public void unload() throws IOException {
+    }
+    
+    public ClusterMessage requestCompleted(String sessionId) {
+        LazyReplicatedMap map = (LazyReplicatedMap)sessions;
+        map.replicate(sessionId,false);
+        return null;
+    }
+
+
+//=========================================================================
+// OVERRIDE THESE METHODS TO IMPLEMENT THE REPLICATION
+//=========================================================================
+
+    public Session createEmptySession() {
+        return new DeltaSession(this);
+    }
+
+    /**
+     * Open Stream and use correct ClassLoader (Container) Switch
+     * ThreadClassLoader
+     * 
+     * @param data
+     * @return The object input stream
+     * @throws IOException
+     */
+    public ReplicationStream getReplicationStream(byte[] data) throws IOException {
+        return getReplicationStream(data,0,data.length);
+    }
+
+    public ReplicationStream getReplicationStream(byte[] data, int offset, int length) throws IOException {
+        ByteArrayInputStream fis =null;
+        ReplicationStream ois = null;
+        Loader loader = null;
+        ClassLoader classLoader = null;
+        //fix to be able to run the DeltaManager
+        //stand alone without a container.
+        //use the Threads context class loader
+        if (container != null) loader = container.getLoader();
+        if (loader != null) classLoader = loader.getClassLoader();
+        else classLoader = Thread.currentThread().getContextClassLoader();
+        //end fix
+        fis = new ByteArrayInputStream(data, offset, length);
+        if ( classLoader == Thread.currentThread().getContextClassLoader() ) {
+            ois = new ReplicationStream(fis, new ClassLoader[] {classLoader});
+        } else {
+            ois = new ReplicationStream(fis, new ClassLoader[] {classLoader,Thread.currentThread().getContextClassLoader()});
+        }
+        return ois;
+    }    
+
+
+
+
+    public String getName() {
+        return this.name;
+    }
+    /**
+     * Prepare for the beginning of active use of the public methods of this
+     * component.  This method should be called after <code>configure()</code>,
+     * and before any of the public methods of the component are utilized.<BR>
+     * Starts the cluster communication channel, this will connect with the other nodes
+     * in the cluster, and request the current session state to be transferred to this node.
+     * @exception IllegalStateException if this component has already been
+     *  started
+     * @exception LifecycleException if this component detects a fatal error
+     *  that prevents this component from being used
+     */
+    public void start() throws LifecycleException {
+        if ( this.started ) return;
+
+
+        //start the javagroups channel
+        try {
+            CatalinaCluster catclust = (CatalinaCluster)cluster;
+            catclust.addManager(getName(), this);
+            this.sessions = new LazyReplicatedMap(this,
+                                                  catclust.getChannel(),
+                                                  DEFAULT_REPL_TIMEOUT,
+                                                  getMapName());
+            super.start();
+        }  catch ( Exception x ) {
+            log.error("Unable to start BackupManager",x);
+            throw new LifecycleException("Failed to start BackupManager",x);
+        }
+    }
+    
+    public String getMapName() {
+        CatalinaCluster catclust = (CatalinaCluster)cluster;
+        Member local = catclust.getLocalMember();
+        return catclust.getManagerName(getName(),this)+"-"+local.getDomain();
+    }
+
+    /**
+     * Gracefully terminate the active use of the public methods of this
+     * component.  This method should be the last one called on a given
+     * instance of this component.<BR>
+     * This will disconnect the cluster communication channel and stop the listener thread.
+     * @exception IllegalStateException if this component has not been started
+     * @exception LifecycleException if this component detects a fatal error
+     *  that needs to be reported
+     */
+    public void stop() throws LifecycleException
+    {
+        if ( !this.started ) return;
+        super.stop();
+        try {
+            cluster.removeManager(getName(),this);
+            LazyReplicatedMap map = (LazyReplicatedMap)sessions;
+            map.breakdown();
+        } catch ( Exception x ){
+            log.error("Unable to stop BackupManager",x);
+            throw new LifecycleException("Failed to stop BackupManager",x);
+        }
+    }
+
+    public void setDistributable(boolean dist) {
+        this.distributable = dist;
+    }
+
+    public boolean getDistributable() {
+        return distributable;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+    public boolean isNotifyListenersOnReplication() {
+        return notifyListenersOnReplication;
+    }
+    public void setNotifyListenersOnReplication(boolean notifyListenersOnReplication) {
+        this.notifyListenersOnReplication = notifyListenersOnReplication;
+    }
+
+
+    /* 
+     * @see org.apache.catalina.ha.ClusterManager#getCluster()
+     */
+    public CatalinaCluster getCluster() {
+        return cluster;
+    }
+    
+    public String[] getInvalidatedSessions() {
+        return new String[0];
+    }
+
+}

Modified: tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/DeltaSession.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/DeltaSession.java?rev=387686&r1=387685&r2=387686&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/DeltaSession.java (original)
+++ tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/DeltaSession.java Tue Mar 21 17:57:39 2006
@@ -261,10 +261,13 @@
      * @param manager
      *            The manager with which this Session is associated
      */
+    public DeltaSession() {
+        this.resetDeltaRequest();
+    }
+    
     public DeltaSession(Manager manager) {
-        super();
+        this();
         this.manager = manager;
-        this.resetDeltaRequest();
     }
 
     // ----------------------------------------------------- ReplicatedMapEntry
@@ -332,7 +335,8 @@
         }
         
         public void setOwner(Object owner) {
-            if ( owner instanceof ClusterManager ) {
+            if ( owner instanceof ClusterManager && getManager()==null) {
+                System.out.println("Setting owner for session:"+getIdInternal()+" to:"+owner);
                 ClusterManager cm = (ClusterManager)owner;
                 this.setManager(cm);
                 this.setValid(true);
@@ -777,7 +781,9 @@
                                            ((DeltaManager)manager).getName(), 
                                            new Boolean(isPrimarySession()), 
                                            expiredId));
-                ( (DeltaManager) manager).sessionExpired(expiredId);
+                if ( manager instanceof DeltaManager ) {
+                    ( (DeltaManager) manager).sessionExpired(expiredId);
+                }
             }
         }
 

Modified: tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/SimpleTcpReplicationManager.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/SimpleTcpReplicationManager.java?rev=387686&r1=387685&r2=387686&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/SimpleTcpReplicationManager.java (original)
+++ tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/SimpleTcpReplicationManager.java Tue Mar 21 17:57:39 2006
@@ -59,11 +59,9 @@
  * When a session is replicated (not an attribute added/removed) the session is serialized into
  * a byte array using the StandardSession.readObjectData, StandardSession.writeObjectData methods.
  */
-public class SimpleTcpReplicationManager extends StandardManager
-implements ClusterManager
+public class SimpleTcpReplicationManager extends StandardManager implements ClusterManager
 {
-    public static org.apache.commons.logging.Log log =
-        org.apache.commons.logging.LogFactory.getLog( SimpleTcpReplicationManager.class );
+    public static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog( SimpleTcpReplicationManager.class );
 
     //the channel configuration
     protected String mChannelConfig = null;

Modified: tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/tcp/SimpleTcpCluster.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/tcp/SimpleTcpCluster.java?rev=387686&r1=387685&r2=387686&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/tcp/SimpleTcpCluster.java (original)
+++ tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/tcp/SimpleTcpCluster.java Tue Mar 21 17:57:39 2006
@@ -474,9 +474,7 @@
      * @see DeltaManager#start()
      */
     public synchronized Manager createManager(String name) {
-        if (log.isDebugEnabled())
-            log.debug("Creating ClusterManager for context " + name
-                    + " using class " + getManagerClassName());
+        if (log.isDebugEnabled()) log.debug("Creating ClusterManager for context " + name + " using class " + getManagerClassName());
         Manager manager = null;
         try {
             manager = (Manager) getClass().getClassLoader().loadClass(getManagerClassName()).newInstance();
@@ -505,14 +503,11 @@
     public void removeManager(String name,Manager manager) {
         if (manager != null) {
             // Notify our interested LifecycleListeners
-            lifecycle.fireLifecycleEvent(BEFORE_MANAGERUNREGISTER_EVENT,
-                    manager);
+            lifecycle.fireLifecycleEvent(BEFORE_MANAGERUNREGISTER_EVENT,manager);
             managers.remove(getManagerName(name,manager));
-            if (manager instanceof ClusterManager)
-                ((ClusterManager) manager).setCluster(null);
+            if (manager instanceof ClusterManager) ((ClusterManager) manager).setCluster(null);
             // Notify our interested LifecycleListeners
-            lifecycle
-                    .fireLifecycleEvent(AFTER_MANAGERUNREGISTER_EVENT, manager);
+            lifecycle.fireLifecycleEvent(AFTER_MANAGERUNREGISTER_EVENT, manager);
         }
     }
 
@@ -528,8 +523,7 @@
      */
     public void addManager(String name, Manager manager) {
         if (!manager.getDistributable()) {
-            log.warn("Manager with name " + name
-                    + " is not distributable, can't add as cluster manager");
+            log.warn("Manager with name " + name + " is not distributable, can't add as cluster manager");
             return;
         }
         // Notify our interested LifecycleListeners
@@ -539,8 +533,7 @@
             ClusterManager cmanager = (ClusterManager) manager ;
             cmanager.setName(clusterName);
             cmanager.setCluster(this);
-            if(cmanager.isDefaultMode())
-                transferProperty("manager",cmanager);
+            if(cmanager.isDefaultMode()) transferProperty("manager",cmanager);
         }
         managers.put(clusterName, manager);
         // Notify our interested LifecycleListeners
@@ -552,7 +545,7 @@
      * @param manager
      * @return
      */
-    private String getManagerName(String name, Manager manager) {
+    public String getManagerName(String name, Manager manager) {
         String clusterName = name ;
         if(getContainer() instanceof Engine) {
             Container context = manager.getContainer() ;
@@ -627,8 +620,7 @@
      */
     public void lifecycleEvent(LifecycleEvent lifecycleEvent) {
         if (log.isTraceEnabled())
-            log.trace(sm.getString("SimpleTcpCluster.event.log", lifecycleEvent
-                    .getType(), lifecycleEvent.getData()));
+            log.trace(sm.getString("SimpleTcpCluster.event.log", lifecycleEvent.getType(), lifecycleEvent.getData()));
     }
 
     // ------------------------------------------------------ public



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org