You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/03/22 02:57:43 UTC
svn commit: r387686 - in /tomcat/container/tc5.5.x/modules:
groupcom/src/share/org/apache/catalina/tribes/tcp/nio/
groupcom/src/share/org/apache/catalina/tribes/tipis/
groupcom/test/org/apache/catalina/tribes/demos/
ha/src/share/org/apache/catalina/ha/...
Author: fhanik
Date: Tue Mar 21 17:57:39 2006
New Revision: 387686
URL: http://svn.apache.org/viewcvs?rev=387686&view=rev
Log:
Initial version of the backup manager, still super buggy, need to figure out a way on how to deserialize sessions.
Added:
tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/BackupManager.java
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java
tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/CatalinaCluster.java
tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/DeltaSession.java
tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/SimpleTcpReplicationManager.java
tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/tcp/SimpleTcpCluster.java
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java?rev=387686&r1=387685&r2=387686&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java Tue Mar 21 17:57:39 2006
@@ -216,11 +216,12 @@
setConnected(false);
if ( socketChannel != null ) {
try {
- Socket socket = socketChannel.socket();
+ Socket socket = null;
+ //socket = socketChannel.socket();
//error free close, all the way
- try {socket.shutdownOutput();}catch ( Exception x){}
- try {socket.shutdownInput();}catch ( Exception x){}
- try {socket.close();}catch ( Exception x){}
+ //try {socket.shutdownOutput();}catch ( Exception x){}
+ //try {socket.shutdownInput();}catch ( Exception x){}
+ //try {socket.close();}catch ( Exception x){}
try {socketChannel.close();}catch ( Exception x){}
socket = null;
}finally {
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java?rev=387686&r1=387685&r2=387686&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java Tue Mar 21 17:57:39 2006
@@ -64,6 +64,11 @@
* The load factor used when none specified in constructor.
**/
public static final float DEFAULT_LOAD_FACTOR = 0.75f;
+
+ /**
+ * Used to identify the map
+ */
+ final String chset = "ISO-8859-1";
//------------------------------------------------------------------------------
// INSTANCE VARIABLES
@@ -76,7 +81,7 @@
private transient Object stateMutex = new Object();
private transient ArrayList mapMembers = new ArrayList();
private transient int channelSendOptions = Channel.SEND_OPTIONS_DEFAULT;
- private transient Object mapOwner;
+ private transient MapOwner mapOwner;
//------------------------------------------------------------------------------
// CONSTRUCTORS
@@ -90,7 +95,7 @@
* @param initialCapacity int - the size of this map, see HashMap
* @param loadFactor float - load factor, see HashMap
*/
- public AbstractReplicatedMap(Object owner,
+ public AbstractReplicatedMap(MapOwner owner,
Channel channel,
long timeout,
String mapContextName,
@@ -106,9 +111,9 @@
return new Member[] {m};
}
- private void init(Object owner, Channel channel, String mapContextName, long timeout, int channelSendOptions) {
+ private void init(MapOwner owner, Channel channel, String mapContextName, long timeout, int channelSendOptions) {
this.mapOwner = owner;
- final String chset = "ISO-8859-1";
+
this.channelSendOptions = channelSendOptions;
this.channel = channel;
this.rpcTimeout = timeout;
@@ -132,6 +137,7 @@
false, null, null, null, wrap(channel.getLocalMember(false)));
Response[] resp = rpcChannel.send(channel.getMembers(), msg, rpcChannel.FIRST_REPLY, channelSendOptions, timeout);
for (int i = 0; i < resp.length; i++) {
+ mapMemberAdded(resp[i].getSource());
messageReceived(resp[i].getMessage(), resp[i].getSource());
}
} catch (ChannelException x) {
@@ -140,6 +146,7 @@
//transfer state from another map
transferState();
+ printMap();
}
public void breakdown() {
@@ -305,6 +312,7 @@
//backup request
if (mapmsg.getMsgType() == mapmsg.MSG_RETRIEVE_BACKUP) {
+ System.out.println("Received a retrieve request for id:"+mapmsg.getKey());
MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
if (entry == null)return null;
mapmsg.setValue( (Serializable) entry.getValue());
@@ -377,6 +385,7 @@
}
if (mapmsg.getMsgType() == MapMessage.MSG_BACKUP) {
+ System.out.println("Received a backup request for id:"+mapmsg.getKey());
MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
if (entry == null) {
entry = new MapEntry(mapmsg.getKey(), mapmsg.getValue());
@@ -386,7 +395,6 @@
if (mapmsg.getValue()!=null && mapmsg.getValue() instanceof ReplicatedMapEntry ) {
((ReplicatedMapEntry)mapmsg.getValue()).setOwner(getMapOwner());
}
- super.put(entry.getKey(), entry);
} else {
entry.setBackup(true);
entry.setProxy(false);
@@ -414,8 +422,9 @@
entry.setValue(mapmsg.getValue());
} //end if
} //end if
+ super.put(entry.getKey(), entry);
} //end if
-
+ printMap();
}
public boolean accept(Serializable msg, Member sender) {
@@ -426,10 +435,15 @@
}
public void mapMemberAdded(Member member) {
+ System.out.println("Received Member added:"+member.getName());
+ if ( member.equals(getChannel().getLocalMember(false)) ) return;
+ System.out.println("Received Member added2:"+member.getName());
//select a backup node if we don't have one
synchronized (mapMembers) {
if (!mapMembers.contains(member) ) mapMembers.add(member);
}
+ System.out.println("Received Member added3:"+member.getName());
+ printMap();
synchronized (stateMutex) {
Iterator i = super.entrySet().iterator();
while (i.hasNext()) {
@@ -460,6 +474,8 @@
}
public void memberDisappeared(Member member) {
+ Exception ex = new Exception("[DEBUG] Removing member:"+member.getName());
+ ex.printStackTrace();
synchronized (mapMembers) {
mapMembers.remove(member);
}
@@ -498,13 +514,41 @@
// METHODS TO OVERRIDE
//------------------------------------------------------------------------------
+ protected void printMap() {
+ try {
+ System.out.println("\nMap["+((Object)this).toString()+"; " + new String(mapContextName, chset) + ", Map Size:" + super.size());
+ Member[] mbrs = getMapMembers();
+ for ( int i=0; i<mbrs.length;i++ ) {
+ System.out.println("Mbr["+(i+1)+"="+mbrs[i].getName());
+ }
+ Iterator i = super.entrySet().iterator();
+ int cnt = 0;
+ while (i.hasNext()) {
+ Map.Entry e = (Map.Entry) i.next();
+ System.out.println( (++cnt) + ". " + e.getValue());
+ }
+ System.out.println("EndMap]\n\n");
+ }catch ( Exception ignore) {
+ ignore.printStackTrace();
+ }
+ }
+
+//------------------------------------------------------------------------------
+// Map Owner - serialization/deserialization
+//------------------------------------------------------------------------------
+ public static interface MapOwner {
+
+ public byte[] serialize(Object mapObject) throws IOException;
+
+ public Serializable deserialize(byte[] data) throws ClassNotFoundException,IOException;
+
+ }
//------------------------------------------------------------------------------
// Map Entry class
//------------------------------------------------------------------------------
- public static class MapEntry
- implements Map.Entry {
+ public static class MapEntry implements Map.Entry {
private boolean backup;
private boolean proxy;
private Member[] backupNodes;
@@ -564,29 +608,12 @@
return key;
}
- public byte[] getDiff() throws IOException {
- if (isDiffable()) {
- return ( (ReplicatedMapEntry) value).getDiff();
- } else {
- return getData();
- }
- }
-
public int hashCode() {
- return value.hashCode();
+ return key.hashCode();
}
public boolean equals(Object o) {
- return value.equals(o);
- }
-
- /**
- * returns the entire object as a byte array
- * @return byte[]
- * @throws IOException
- */
- public byte[] getData() throws IOException {
- return (new ObjectStreamable(value)).getBuf().getArray();
+ return key.equals(o);
}
/**
@@ -614,6 +641,16 @@
value = XByteBuffer.deserialize(data, offset, length);
}
}
+
+ public String toString() {
+ StringBuffer buf = new StringBuffer("MapEntry[key:");
+ buf.append(getKey()).append("; ");
+ buf.append("value:").append(getValue()).append("; ");
+ buf.append("primary:").append(isPrimary()).append("; ");
+ buf.append("backup:").append(isBackup()).append("; ");
+ buf.append("proxy:").append(isProxy()).append(";]");
+ return buf.toString();
+ }
}
@@ -640,8 +677,7 @@
public MapMessage() {}
- public MapMessage(byte[] mapId,
- int msgtype, boolean diff,
+ public MapMessage(byte[] mapId,int msgtype, boolean diff,
Serializable key, Serializable value,
byte[] diffvalue, Member[] nodes) {
this.mapId = mapId;
@@ -751,9 +787,8 @@
out.write(d);
}
}
-
}
-
+
public void writeExternal(ObjectOutput out) throws IOException {
out.writeInt(mapId.length);
out.write(mapId);
@@ -803,60 +838,6 @@
}
} //MapMessage
-//------------------------------------------------------------------------------
-// streamable class
-//------------------------------------------------------------------------------
-
- public static class ObjectStreamable
- implements Streamable {
- private DirectByteArrayOutputStream buf;
- private int pos = 0;
- public ObjectStreamable(Serializable value) throws IOException {
- buf = new DirectByteArrayOutputStream(1024);
- ObjectOutputStream out = new ObjectOutputStream(buf);
- out.writeObject(value);
- out.flush();
- }
-
- /**
- * returns true if the stream has reached its end
- * @return boolean
- */
- public synchronized boolean eof() {
- return (pos >= buf.size());
-
- }
-
- /**
- * write data into the byte array starting at offset, maximum bytes read are (data.length-offset)
- * @param data byte[] - the array to read data into
- * @param offset int - start position for writing data
- * @return int - the number of bytes written into the data buffer
- */
- public synchronized int write(byte[] data, int offset) throws IOException {
- int length = Math.min(data.length - offset, buf.size() - pos);
- System.arraycopy(buf.getArrayDirect(), pos, data, offset, length);
- pos = pos + length;
- return length;
- }
-
- public synchronized int read(byte[] data, int offset, int length) throws IOException {
- return -1;
- }
-
- public DirectByteArrayOutputStream getBuf() {
- return buf;
- }
-
- public int size() {
- return buf.size();
- }
-
- public int pos() {
- return pos;
- }
-
- }
public Channel getChannel() {
return channel;
@@ -886,7 +867,7 @@
return mapOwner;
}
- public void setMapOwner(Object mapOwner) {
+ public void setMapOwner(MapOwner mapOwner) {
this.mapOwner = mapOwner;
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?rev=387686&r1=387685&r2=387686&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java Tue Mar 21 17:57:39 2006
@@ -29,6 +29,7 @@
import org.apache.catalina.tribes.ChannelListener;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.MembershipListener;
+import org.apache.catalina.tribes.tipis.AbstractReplicatedMap.MapOwner;
/**
* A smart implementation of a stateful replicated map. uses primary/secondary backup strategy.
@@ -84,7 +85,7 @@
* @param initialCapacity int - the size of this map, see HashMap
* @param loadFactor float - load factor, see HashMap
*/
- public LazyReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName, int initialCapacity, float loadFactor) {
+ public LazyReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName, int initialCapacity, float loadFactor) {
super(owner,channel,timeout,mapContextName,initialCapacity,loadFactor, Channel.SEND_OPTIONS_DEFAULT);
}
@@ -95,7 +96,7 @@
* @param mapContextName String - unique name for this map, to allow multiple maps per channel
* @param initialCapacity int - the size of this map, see HashMap
*/
- public LazyReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName, int initialCapacity) {
+ public LazyReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName, int initialCapacity) {
super(owner, channel,timeout,mapContextName,initialCapacity, LazyReplicatedMap.DEFAULT_LOAD_FACTOR, Channel.SEND_OPTIONS_DEFAULT);
}
@@ -105,7 +106,7 @@
* @param timeout long - timeout for RPC messags
* @param mapContextName String - unique name for this map, to allow multiple maps per channel
*/
- public LazyReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName) {
+ public LazyReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName) {
super(owner, channel,timeout,mapContextName, LazyReplicatedMap.DEFAULT_INITIAL_CAPACITY,LazyReplicatedMap.DEFAULT_LOAD_FACTOR,Channel.SEND_OPTIONS_DEFAULT);
}
@@ -142,29 +143,39 @@
}
public Object get(Object key) {
+ System.out.println("Getting session id:"+key);
+ printMap();
MapEntry entry = (MapEntry)super.get(key);
if ( entry == null ) return null;
if ( !entry.isPrimary() ) {
//if the message is not primary, we need to retrieve the latest value
try {
- MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_RETRIEVE_BACKUP, false,
- (Serializable) key, null, null, null);
- Response[] resp = getRpcChannel().send(entry.getBackupNodes(),msg, this.getRpcChannel().FIRST_REPLY, Channel.SEND_OPTIONS_DEFAULT, getRpcTimeout());
- if (resp == null || resp.length == 0) {
- //no responses
- log.warn("Unable to retrieve remote object for key:" + key);
- return null;
- }
- msg = (MapMessage) resp[0].getMessage();
- Member[] backup = entry.getBackupNodes();
- if ( entry.getValue() instanceof ReplicatedMapEntry ) {
- ReplicatedMapEntry val = (ReplicatedMapEntry)entry.getValue();
- val.setOwner(getMapOwner());
+ Member[] backup = null;
+ MapMessage msg = null;
+ if ( !entry.isBackup() ) {
+ //make sure we don't retrieve from ourselves
+ System.out.println("Retrieving from remote session id:"+key);
+ msg = new MapMessage(getMapContextName(), MapMessage.MSG_RETRIEVE_BACKUP, false,
+ (Serializable) key, null, null, null);
+ Response[] resp = getRpcChannel().send(entry.getBackupNodes(),msg, this.getRpcChannel().FIRST_REPLY, Channel.SEND_OPTIONS_DEFAULT, getRpcTimeout());
+ if (resp == null || resp.length == 0) {
+ //no responses
+ log.warn("Unable to retrieve remote object for key:" + key);
+ return null;
+ }
+ msg = (MapMessage) resp[0].getMessage();
+
+ backup = entry.getBackupNodes();
+ if ( entry.getValue() instanceof ReplicatedMapEntry ) {
+ ReplicatedMapEntry val = (ReplicatedMapEntry)entry.getValue();
+ val.setOwner(getMapOwner());
+ }
+ entry.setValue(msg.getValue());
}
if (entry.isBackup()) {
//select a new backup node
- backup = publishEntryInfo(key, msg.getValue());
+ backup = publishEntryInfo(key, entry.getValue());
} else if ( entry.isProxy() ) {
//invalidate the previous primary
msg = new MapMessage(getMapContextName(),MapMessage.MSG_PROXY,false,(Serializable)key,null,null,backup);
@@ -174,7 +185,7 @@
entry.setBackupNodes(backup);
entry.setBackup(false);
entry.setProxy(false);
- entry.setValue(msg.getValue());
+
} catch (ChannelException x) {
log.error("Unable to replicate out data for a LazyReplicatedMap.get operation", x);
@@ -197,6 +208,7 @@
public Object put(Object key, Object value) {
+ System.out.println("Adding session id:"+key);
if ( !(key instanceof Serializable) ) throw new IllegalArgumentException("Key is not serializable:"+key.getClass().getName());
if ( value == null ) return remove(key);
if ( !(value instanceof Serializable) ) throw new IllegalArgumentException("Value is not serializable:"+value.getClass().getName());
@@ -216,6 +228,7 @@
log.error("Unable to replicate out data for a LazyReplicatedMap.put operation", x);
}
super.put(key,entry);
+ printMap();
return old;
}
@@ -293,7 +306,7 @@
while ( i.hasNext() ) {
Map.Entry e = (Map.Entry)i.next();
MapEntry entry = (MapEntry)e.getValue();
- if ( entry.isPrimary() ) set.add(entry.getValue());
+ if ( entry.isPrimary() ) set.add(entry);
}
return Collections.unmodifiableSet(set);
}
@@ -342,7 +355,7 @@
while ( i.hasNext() ) {
Map.Entry e = (Map.Entry)i.next();
MapEntry entry = (MapEntry)e.getValue();
- if ( entry.isPrimary() ) values.add(entry.getValue());
+ if ( entry.isPrimary() && entry.getValue()!=null) values.add(entry.getValue());
}
return Collections.unmodifiableCollection(values);
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java?rev=387686&r1=387685&r2=387686&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java Tue Mar 21 17:57:39 2006
@@ -29,6 +29,7 @@
import org.apache.catalina.tribes.ChannelListener;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.MembershipListener;
+import org.apache.catalina.tribes.tipis.AbstractReplicatedMap.MapOwner;
/**
* All-to-all replication for a hash map implementation. Each node in the cluster will carry an identical
@@ -66,7 +67,7 @@
* @param initialCapacity int - the size of this map, see HashMap
* @param loadFactor float - load factor, see HashMap
*/
- public ReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName, int initialCapacity,float loadFactor) {
+ public ReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName, int initialCapacity,float loadFactor) {
super(owner,channel, timeout, mapContextName, initialCapacity, loadFactor, Channel.SEND_OPTIONS_DEFAULT);
}
@@ -77,7 +78,7 @@
* @param mapContextName String - unique name for this map, to allow multiple maps per channel
* @param initialCapacity int - the size of this map, see HashMap
*/
- public ReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName, int initialCapacity) {
+ public ReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName, int initialCapacity) {
super(owner,channel, timeout, mapContextName, initialCapacity, AbstractReplicatedMap.DEFAULT_LOAD_FACTOR,Channel.SEND_OPTIONS_DEFAULT);
}
@@ -87,7 +88,7 @@
* @param timeout long - timeout for RPC messags
* @param mapContextName String - unique name for this map, to allow multiple maps per channel
*/
- public ReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName) {
+ public ReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName) {
super(owner, channel, timeout, mapContextName,AbstractReplicatedMap.DEFAULT_INITIAL_CAPACITY, AbstractReplicatedMap.DEFAULT_LOAD_FACTOR, Channel.SEND_OPTIONS_DEFAULT);
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java?rev=387686&r1=387685&r2=387686&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java Tue Mar 21 17:57:39 2006
@@ -45,7 +45,7 @@
protected SimpleTableDemo table;
public MapDemo(Channel channel ) {
- map = new LazyReplicatedMap(this,channel,5000, "MapDemo");
+ map = new LazyReplicatedMap(null,channel,5000, "MapDemo");
table = SimpleTableDemo.createAndShowGUI(map,channel.getLocalMember(false).getName());
channel.addChannelListener(this);
channel.addMembershipListener(this);
Modified: tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/CatalinaCluster.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/CatalinaCluster.java?rev=387686&r1=387685&r2=387686&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/CatalinaCluster.java (original)
+++ tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/CatalinaCluster.java Tue Mar 21 17:57:39 2006
@@ -123,6 +123,7 @@
public Manager getManager(String name);
public void removeManager(String name,Manager manager);
public void addManager(String name,Manager manager);
+ public String getManagerName(String name, Manager manager);
public Valve[] getValves();
public void setChannel(Channel channel);
Added: tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/BackupManager.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/BackupManager.java?rev=387686&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/BackupManager.java (added)
+++ tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/BackupManager.java Tue Mar 21 17:57:39 2006
@@ -0,0 +1,265 @@
+/*
+ * Copyright 1999,2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.ha.session;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+
+import org.apache.catalina.LifecycleException;
+import org.apache.catalina.Loader;
+import org.apache.catalina.Session;
+import org.apache.catalina.ha.CatalinaCluster;
+import org.apache.catalina.ha.ClusterManager;
+import org.apache.catalina.ha.ClusterMessage;
+import org.apache.catalina.session.StandardManager;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.io.ReplicationStream;
+import org.apache.catalina.tribes.tipis.LazyReplicatedMap;
+
+/**
+ *@author Filip Hanik
+ *@version 1.0
+ */
+public class BackupManager extends StandardManager implements ClusterManager
+{
+ public static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog( BackupManager.class );
+
+ protected static long DEFAULT_REPL_TIMEOUT = 15000;//15 seconds
+
+ /** Set to true if we don't want the sessions to expire on shutdown */
+ protected boolean mExpireSessionsOnShutdown = true;
+
+ /**
+ * The name of this manager
+ */
+ protected String name;
+
+ /**
+ * A reference to the cluster
+ */
+ protected CatalinaCluster cluster;
+
+ /**
+ * Should listeners be notified?
+ */
+ private boolean notifyListenersOnReplication;
+
+
+ /**
+ * Constructor, just calls super()
+ *
+ */
+ public BackupManager() {
+ super();
+ }
+
+
+//******************************************************************************/
+// ClusterManager Interface
+//******************************************************************************/
+
+ public void messageDataReceived(ClusterMessage msg) {
+ }
+
+ public boolean isSendClusterDomainOnly() {
+ return false;
+ }
+
+ /**
+ * @param sendClusterDomainOnly The sendClusterDomainOnly to set.
+ */
+ public void setSendClusterDomainOnly(boolean sendClusterDomainOnly) {
+ }
+
+ /**
+ * @return Returns the defaultMode.
+ */
+ public boolean isDefaultMode() {
+ return false;
+ }
+ /**
+ * @param defaultMode The defaultMode to set.
+ */
+ public void setDefaultMode(boolean defaultMode) {
+ }
+
+ public void setExpireSessionsOnShutdown(boolean expireSessionsOnShutdown)
+ {
+ mExpireSessionsOnShutdown = expireSessionsOnShutdown;
+ }
+
+ public void setCluster(CatalinaCluster cluster) {
+ if(log.isDebugEnabled())
+ log.debug("Cluster associated with SimpleTcpReplicationManager");
+ this.cluster = cluster;
+ }
+
+ public boolean getExpireSessionsOnShutdown()
+ {
+ return mExpireSessionsOnShutdown;
+ }
+
+
+ /**
+ * Override persistence since they don't go hand in hand with replication for now.
+ */
+ public void unload() throws IOException {
+ }
+
+ public ClusterMessage requestCompleted(String sessionId) {
+ LazyReplicatedMap map = (LazyReplicatedMap)sessions;
+ map.replicate(sessionId,false);
+ return null;
+ }
+
+
+//=========================================================================
+// OVERRIDE THESE METHODS TO IMPLEMENT THE REPLICATION
+//=========================================================================
+
+ public Session createEmptySession() {
+ return new DeltaSession(this);
+ }
+
+ /**
+ * Open Stream and use correct ClassLoader (Container) Switch
+ * ThreadClassLoader
+ *
+ * @param data
+ * @return The object input stream
+ * @throws IOException
+ */
+ public ReplicationStream getReplicationStream(byte[] data) throws IOException {
+ return getReplicationStream(data,0,data.length);
+ }
+
+ public ReplicationStream getReplicationStream(byte[] data, int offset, int length) throws IOException {
+ ByteArrayInputStream fis =null;
+ ReplicationStream ois = null;
+ Loader loader = null;
+ ClassLoader classLoader = null;
+ //fix to be able to run the DeltaManager
+ //stand alone without a container.
+ //use the Threads context class loader
+ if (container != null) loader = container.getLoader();
+ if (loader != null) classLoader = loader.getClassLoader();
+ else classLoader = Thread.currentThread().getContextClassLoader();
+ //end fix
+ fis = new ByteArrayInputStream(data, offset, length);
+ if ( classLoader == Thread.currentThread().getContextClassLoader() ) {
+ ois = new ReplicationStream(fis, new ClassLoader[] {classLoader});
+ } else {
+ ois = new ReplicationStream(fis, new ClassLoader[] {classLoader,Thread.currentThread().getContextClassLoader()});
+ }
+ return ois;
+ }
+
+
+
+
+ public String getName() {
+ return this.name;
+ }
+ /**
+ * Prepare for the beginning of active use of the public methods of this
+ * component. This method should be called after <code>configure()</code>,
+ * and before any of the public methods of the component are utilized.<BR>
+ * Starts the cluster communication channel, this will connect with the other nodes
+ * in the cluster, and request the current session state to be transferred to this node.
+ * @exception IllegalStateException if this component has already been
+ * started
+ * @exception LifecycleException if this component detects a fatal error
+ * that prevents this component from being used
+ */
+ public void start() throws LifecycleException {
+ if ( this.started ) return;
+
+
+ //start the javagroups channel
+ try {
+ CatalinaCluster catclust = (CatalinaCluster)cluster;
+ catclust.addManager(getName(), this);
+ this.sessions = new LazyReplicatedMap(this,
+ catclust.getChannel(),
+ DEFAULT_REPL_TIMEOUT,
+ getMapName());
+ super.start();
+ } catch ( Exception x ) {
+ log.error("Unable to start BackupManager",x);
+ throw new LifecycleException("Failed to start BackupManager",x);
+ }
+ }
+
+ public String getMapName() {
+ CatalinaCluster catclust = (CatalinaCluster)cluster;
+ Member local = catclust.getLocalMember();
+ return catclust.getManagerName(getName(),this)+"-"+local.getDomain();
+ }
+
+ /**
+ * Gracefully terminate the active use of the public methods of this
+ * component. This method should be the last one called on a given
+ * instance of this component.<BR>
+ * This will disconnect the cluster communication channel and stop the listener thread.
+ * @exception IllegalStateException if this component has not been started
+ * @exception LifecycleException if this component detects a fatal error
+ * that needs to be reported
+ */
+ public void stop() throws LifecycleException
+ {
+ if ( !this.started ) return;
+ super.stop();
+ try {
+ cluster.removeManager(getName(),this);
+ LazyReplicatedMap map = (LazyReplicatedMap)sessions;
+ map.breakdown();
+ } catch ( Exception x ){
+ log.error("Unable to stop BackupManager",x);
+ throw new LifecycleException("Failed to stop BackupManager",x);
+ }
+ }
+
+ public void setDistributable(boolean dist) {
+ this.distributable = dist;
+ }
+
+ public boolean getDistributable() {
+ return distributable;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+ public boolean isNotifyListenersOnReplication() {
+ return notifyListenersOnReplication;
+ }
+ public void setNotifyListenersOnReplication(boolean notifyListenersOnReplication) {
+ this.notifyListenersOnReplication = notifyListenersOnReplication;
+ }
+
+
+ /*
+ * @see org.apache.catalina.ha.ClusterManager#getCluster()
+ */
+ public CatalinaCluster getCluster() {
+ return cluster;
+ }
+
+ public String[] getInvalidatedSessions() {
+ return new String[0];
+ }
+
+}
Modified: tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/DeltaSession.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/DeltaSession.java?rev=387686&r1=387685&r2=387686&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/DeltaSession.java (original)
+++ tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/DeltaSession.java Tue Mar 21 17:57:39 2006
@@ -261,10 +261,13 @@
* @param manager
* The manager with which this Session is associated
*/
+ public DeltaSession() {
+ this.resetDeltaRequest();
+ }
+
public DeltaSession(Manager manager) {
- super();
+ this();
this.manager = manager;
- this.resetDeltaRequest();
}
// ----------------------------------------------------- ReplicatedMapEntry
@@ -332,7 +335,8 @@
}
public void setOwner(Object owner) {
- if ( owner instanceof ClusterManager ) {
+ if ( owner instanceof ClusterManager && getManager()==null) {
+ System.out.println("Setting owner for session:"+getIdInternal()+" to:"+owner);
ClusterManager cm = (ClusterManager)owner;
this.setManager(cm);
this.setValid(true);
@@ -777,7 +781,9 @@
((DeltaManager)manager).getName(),
new Boolean(isPrimarySession()),
expiredId));
- ( (DeltaManager) manager).sessionExpired(expiredId);
+ if ( manager instanceof DeltaManager ) {
+ ( (DeltaManager) manager).sessionExpired(expiredId);
+ }
}
}
Modified: tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/SimpleTcpReplicationManager.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/SimpleTcpReplicationManager.java?rev=387686&r1=387685&r2=387686&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/SimpleTcpReplicationManager.java (original)
+++ tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/SimpleTcpReplicationManager.java Tue Mar 21 17:57:39 2006
@@ -59,11 +59,9 @@
* When a session is replicated (not an attribute added/removed) the session is serialized into
* a byte array using the StandardSession.readObjectData, StandardSession.writeObjectData methods.
*/
-public class SimpleTcpReplicationManager extends StandardManager
-implements ClusterManager
+public class SimpleTcpReplicationManager extends StandardManager implements ClusterManager
{
- public static org.apache.commons.logging.Log log =
- org.apache.commons.logging.LogFactory.getLog( SimpleTcpReplicationManager.class );
+ public static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog( SimpleTcpReplicationManager.class );
//the channel configuration
protected String mChannelConfig = null;
Modified: tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/tcp/SimpleTcpCluster.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/tcp/SimpleTcpCluster.java?rev=387686&r1=387685&r2=387686&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/tcp/SimpleTcpCluster.java (original)
+++ tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/tcp/SimpleTcpCluster.java Tue Mar 21 17:57:39 2006
@@ -474,9 +474,7 @@
* @see DeltaManager#start()
*/
public synchronized Manager createManager(String name) {
- if (log.isDebugEnabled())
- log.debug("Creating ClusterManager for context " + name
- + " using class " + getManagerClassName());
+ if (log.isDebugEnabled()) log.debug("Creating ClusterManager for context " + name + " using class " + getManagerClassName());
Manager manager = null;
try {
manager = (Manager) getClass().getClassLoader().loadClass(getManagerClassName()).newInstance();
@@ -505,14 +503,11 @@
public void removeManager(String name,Manager manager) {
if (manager != null) {
// Notify our interested LifecycleListeners
- lifecycle.fireLifecycleEvent(BEFORE_MANAGERUNREGISTER_EVENT,
- manager);
+ lifecycle.fireLifecycleEvent(BEFORE_MANAGERUNREGISTER_EVENT,manager);
managers.remove(getManagerName(name,manager));
- if (manager instanceof ClusterManager)
- ((ClusterManager) manager).setCluster(null);
+ if (manager instanceof ClusterManager) ((ClusterManager) manager).setCluster(null);
// Notify our interested LifecycleListeners
- lifecycle
- .fireLifecycleEvent(AFTER_MANAGERUNREGISTER_EVENT, manager);
+ lifecycle.fireLifecycleEvent(AFTER_MANAGERUNREGISTER_EVENT, manager);
}
}
@@ -528,8 +523,7 @@
*/
public void addManager(String name, Manager manager) {
if (!manager.getDistributable()) {
- log.warn("Manager with name " + name
- + " is not distributable, can't add as cluster manager");
+ log.warn("Manager with name " + name + " is not distributable, can't add as cluster manager");
return;
}
// Notify our interested LifecycleListeners
@@ -539,8 +533,7 @@
ClusterManager cmanager = (ClusterManager) manager ;
cmanager.setName(clusterName);
cmanager.setCluster(this);
- if(cmanager.isDefaultMode())
- transferProperty("manager",cmanager);
+ if(cmanager.isDefaultMode()) transferProperty("manager",cmanager);
}
managers.put(clusterName, manager);
// Notify our interested LifecycleListeners
@@ -552,7 +545,7 @@
* @param manager
* @return
*/
- private String getManagerName(String name, Manager manager) {
+ public String getManagerName(String name, Manager manager) {
String clusterName = name ;
if(getContainer() instanceof Engine) {
Container context = manager.getContainer() ;
@@ -627,8 +620,7 @@
*/
public void lifecycleEvent(LifecycleEvent lifecycleEvent) {
if (log.isTraceEnabled())
- log.trace(sm.getString("SimpleTcpCluster.event.log", lifecycleEvent
- .getType(), lifecycleEvent.getData()));
+ log.trace(sm.getString("SimpleTcpCluster.event.log", lifecycleEvent.getType(), lifecycleEvent.getData()));
}
// ------------------------------------------------------ public
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org