You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2006/10/24 05:18:02 UTC

svn commit: r467222 [7/31] - in /tomcat/tc6.0.x/trunk/java: javax/annotation/ javax/annotation/security/ javax/ejb/ javax/el/ javax/mail/ javax/mail/internet/ javax/persistence/ javax/servlet/ javax/servlet/http/ javax/servlet/jsp/ javax/servlet/jsp/el...

Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/session/DeltaManager.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/session/DeltaManager.java?view=diff&rev=467222&r1=467221&r2=467222
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/session/DeltaManager.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/session/DeltaManager.java Mon Oct 23 20:17:11 2006
@@ -1,1520 +1,1520 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.catalina.ha.session;
-
-import java.beans.PropertyChangeEvent;
-import java.io.BufferedOutputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.Iterator;
-
-import org.apache.catalina.Cluster;
-import org.apache.catalina.Container;
-import org.apache.catalina.Context;
-import org.apache.catalina.Engine;
-import org.apache.catalina.Host;
-import org.apache.catalina.LifecycleException;
-import org.apache.catalina.LifecycleListener;
-import org.apache.catalina.Session;
-import org.apache.catalina.Valve;
-import org.apache.catalina.core.StandardContext;
-import org.apache.catalina.ha.CatalinaCluster;
-import org.apache.catalina.ha.ClusterMessage;
-import org.apache.catalina.ha.tcp.ReplicationValve;
-import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.io.ReplicationStream;
-import org.apache.catalina.util.LifecycleSupport;
-import org.apache.catalina.util.StringManager;
-import org.apache.catalina.ha.ClusterManager;
-
-/**
- * The DeltaManager manages replicated sessions by only replicating the deltas
- * in data. For applications written to handle this, the DeltaManager is the
- * optimal way of replicating data.
- * 
- * This code is almost identical to StandardManager with a difference in how it
- * persists sessions and some modifications to it.
- * 
- * <b>IMPLEMENTATION NOTE </b>: Correct behavior of session storing and
- * reloading depends upon external calls to the <code>start()</code> and
- * <code>stop()</code> methods of this class at the correct times.
- * 
- * @author Filip Hanik
- * @author Craig R. McClanahan
- * @author Jean-Francois Arcand
- * @author Peter Rossbach
- * @version $Revision: 380100 $ $Date: 2006-02-23 06:08:14 -0600 (Thu, 23 Feb 2006) $
- */
-
-public class DeltaManager extends ClusterManagerBase{
-
-    // ---------------------------------------------------- Security Classes
-    public static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog(DeltaManager.class);
-
-    /**
-     * The string manager for this package.
-     */
-    protected static StringManager sm = StringManager.getManager(Constants.Package);
-
-    // ----------------------------------------------------- Instance Variables
-
-    /**
-     * The descriptive information about this implementation.
-     */
-    private static final String info = "DeltaManager/2.1";
-
-    /**
-     * Has this component been started yet?
-     */
-    private boolean started = false;
-
-    /**
-     * The descriptive name of this Manager implementation (for logging).
-     */
-    protected static String managerName = "DeltaManager";
-    protected String name = null;
-    protected boolean defaultMode = false;
-    private CatalinaCluster cluster = null;
-
-    /**
-     * cached replication valve cluster container!
-     */
-    private ReplicationValve replicationValve = null ;
-    
-    /**
-     * The lifecycle event support for this component.
-     */
-    protected LifecycleSupport lifecycle = new LifecycleSupport(this);
-
-    /**
-     * The maximum number of active Sessions allowed, or -1 for no limit.
-     */
-    private int maxActiveSessions = -1;
-    private boolean expireSessionsOnShutdown = false;
-    private boolean notifyListenersOnReplication = true;
-    private boolean notifySessionListenersOnReplication = true;
-    private boolean stateTransfered = false ;
-    private int stateTransferTimeout = 60;
-    private boolean sendAllSessions = true;
-    private boolean sendClusterDomainOnly = true ;
-    private int sendAllSessionsSize = 1000 ;
-    
-    /**
-     * wait time between send session block (default 2 sec) 
-     */
-    private int sendAllSessionsWaitTime = 2 * 1000 ; 
-    private ArrayList receivedMessageQueue = new ArrayList() ;
-    private boolean receiverQueue = false ;
-    private boolean stateTimestampDrop = true ;
-    private long stateTransferCreateSendTime; 
-    
-    // ------------------------------------------------------------------ stats attributes
-    
-    int rejectedSessions = 0;
-    private long sessionReplaceCounter = 0 ;
-    long processingTime = 0;
-    private long counterReceive_EVT_GET_ALL_SESSIONS = 0 ;
-    private long counterSend_EVT_ALL_SESSION_DATA = 0 ;
-    private long counterReceive_EVT_ALL_SESSION_DATA = 0 ;
-    private long counterReceive_EVT_SESSION_CREATED = 0 ;
-    private long counterReceive_EVT_SESSION_EXPIRED = 0;
-    private long counterReceive_EVT_SESSION_ACCESSED = 0 ;
-    private long counterReceive_EVT_SESSION_DELTA = 0;
-    private long counterSend_EVT_GET_ALL_SESSIONS = 0 ;
-    private long counterSend_EVT_SESSION_CREATED = 0;
-    private long counterSend_EVT_SESSION_DELTA = 0 ;
-    private long counterSend_EVT_SESSION_ACCESSED = 0;
-    private long counterSend_EVT_SESSION_EXPIRED = 0;
-    private int counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0 ;
-    private int counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0 ;
-    private int counterNoStateTransfered = 0 ;
-
-
-    // ------------------------------------------------------------- Constructor
-    public DeltaManager() {
-        super();
-    }
-
-    // ------------------------------------------------------------- Properties
-    
-    /**
-     * Return descriptive information about this Manager implementation and the
-     * corresponding version number, in the format
-     * <code>&lt;description&gt;/&lt;version&gt;</code>.
-     */
-    public String getInfo() {
-        return info;
-    }
-
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    /**
-     * Return the descriptive short name of this Manager implementation.
-     */
-    public String getName() {
-        return name;
-    }
-
-    /**
-     * @return Returns the counterSend_EVT_GET_ALL_SESSIONS.
-     */
-    public long getCounterSend_EVT_GET_ALL_SESSIONS() {
-        return counterSend_EVT_GET_ALL_SESSIONS;
-    }
-    
-    /**
-     * @return Returns the counterSend_EVT_SESSION_ACCESSED.
-     */
-    public long getCounterSend_EVT_SESSION_ACCESSED() {
-        return counterSend_EVT_SESSION_ACCESSED;
-    }
-    
-    /**
-     * @return Returns the counterSend_EVT_SESSION_CREATED.
-     */
-    public long getCounterSend_EVT_SESSION_CREATED() {
-        return counterSend_EVT_SESSION_CREATED;
-    }
-
-    /**
-     * @return Returns the counterSend_EVT_SESSION_DELTA.
-     */
-    public long getCounterSend_EVT_SESSION_DELTA() {
-        return counterSend_EVT_SESSION_DELTA;
-    }
-
-    /**
-     * @return Returns the counterSend_EVT_SESSION_EXPIRED.
-     */
-    public long getCounterSend_EVT_SESSION_EXPIRED() {
-        return counterSend_EVT_SESSION_EXPIRED;
-    }
- 
-    /**
-     * @return Returns the counterSend_EVT_ALL_SESSION_DATA.
-     */
-    public long getCounterSend_EVT_ALL_SESSION_DATA() {
-        return counterSend_EVT_ALL_SESSION_DATA;
-    }
-
-    /**
-     * @return Returns the counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE.
-     */
-    public int getCounterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE() {
-        return counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE;
-    }
- 
-    /**
-     * @return Returns the counterReceive_EVT_ALL_SESSION_DATA.
-     */
-    public long getCounterReceive_EVT_ALL_SESSION_DATA() {
-        return counterReceive_EVT_ALL_SESSION_DATA;
-    }
-    
-    /**
-     * @return Returns the counterReceive_EVT_GET_ALL_SESSIONS.
-     */
-    public long getCounterReceive_EVT_GET_ALL_SESSIONS() {
-        return counterReceive_EVT_GET_ALL_SESSIONS;
-    }
-    
-    /**
-     * @return Returns the counterReceive_EVT_SESSION_ACCESSED.
-     */
-    public long getCounterReceive_EVT_SESSION_ACCESSED() {
-        return counterReceive_EVT_SESSION_ACCESSED;
-    }
-    
-    /**
-     * @return Returns the counterReceive_EVT_SESSION_CREATED.
-     */
-    public long getCounterReceive_EVT_SESSION_CREATED() {
-        return counterReceive_EVT_SESSION_CREATED;
-    }
-    
-    /**
-     * @return Returns the counterReceive_EVT_SESSION_DELTA.
-     */
-    public long getCounterReceive_EVT_SESSION_DELTA() {
-        return counterReceive_EVT_SESSION_DELTA;
-    }
-    
-    /**
-     * @return Returns the counterReceive_EVT_SESSION_EXPIRED.
-     */
-    public long getCounterReceive_EVT_SESSION_EXPIRED() {
-        return counterReceive_EVT_SESSION_EXPIRED;
-    }
-    
-    
-    /**
-     * @return Returns the counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE.
-     */
-    public int getCounterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE() {
-        return counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE;
-    }
-    
-    /**
-     * @return Returns the processingTime.
-     */
-    public long getProcessingTime() {
-        return processingTime;
-    }
- 
-    /**
-     * @return Returns the sessionReplaceCounter.
-     */
-    public long getSessionReplaceCounter() {
-        return sessionReplaceCounter;
-    }
-    
-    /**
-     * Number of session creations that failed due to maxActiveSessions
-     * 
-     * @return The count
-     */
-    public int getRejectedSessions() {
-        return rejectedSessions;
-    }
-
-    public void setRejectedSessions(int rejectedSessions) {
-        this.rejectedSessions = rejectedSessions;
-    }
-
-    /**
-     * @return Returns the counterNoStateTransfered.
-     */
-    public int getCounterNoStateTransfered() {
-        return counterNoStateTransfered;
-    }
-    
-    public int getReceivedQueueSize() {
-        return receivedMessageQueue.size() ;
-    }
-    
-    /**
-     * @return Returns the stateTransferTimeout.
-     */
-    public int getStateTransferTimeout() {
-        return stateTransferTimeout;
-    }
-    /**
-     * @param timeoutAllSession The timeout
-     */
-    public void setStateTransferTimeout(int timeoutAllSession) {
-        this.stateTransferTimeout = timeoutAllSession;
-    }
-
-    /**
-     * is session state transfered complete?
-     * 
-     */
-    public boolean getStateTransfered() {
-        return stateTransfered;
-    }
-
-    /**
-     * set that state ist complete transfered  
-     * @param stateTransfered
-     */
-    public void setStateTransfered(boolean stateTransfered) {
-        this.stateTransfered = stateTransfered;
-    }
-    
-    /**
-     * @return Returns the sendAllSessionsWaitTime in msec
-     */
-    public int getSendAllSessionsWaitTime() {
-        return sendAllSessionsWaitTime;
-    }
-    
-    /**
-     * @param sendAllSessionsWaitTime The sendAllSessionsWaitTime to set at msec.
-     */
-    public void setSendAllSessionsWaitTime(int sendAllSessionsWaitTime) {
-        this.sendAllSessionsWaitTime = sendAllSessionsWaitTime;
-    }
-    
-    /**
-     * @return Returns the sendClusterDomainOnly.
-     */
-    public boolean doDomainReplication() {
-        return sendClusterDomainOnly;
-    }
-    
-    /**
-     * @param sendClusterDomainOnly The sendClusterDomainOnly to set.
-     */
-    public void setDomainReplication(boolean sendClusterDomainOnly) {
-        this.sendClusterDomainOnly = sendClusterDomainOnly;
-    }
-
-    /**
-     * @return Returns the stateTimestampDrop.
-     */
-    public boolean isStateTimestampDrop() {
-        return stateTimestampDrop;
-    }
-    
-    /**
-     * @param isTimestampDrop The new flag value
-     */
-    public void setStateTimestampDrop(boolean isTimestampDrop) {
-        this.stateTimestampDrop = isTimestampDrop;
-    }
-    
-    /**
-     * Return the maximum number of active Sessions allowed, or -1 for no limit.
-     */
-    public int getMaxActiveSessions() {
-        return (this.maxActiveSessions);
-    }
-
-    /**
-     * Set the maximum number of actives Sessions allowed, or -1 for no limit.
-     * 
-     * @param max
-     *            The new maximum number of sessions
-     */
-    public void setMaxActiveSessions(int max) {
-        int oldMaxActiveSessions = this.maxActiveSessions;
-        this.maxActiveSessions = max;
-        support.firePropertyChange("maxActiveSessions", new Integer(oldMaxActiveSessions), new Integer(this.maxActiveSessions));
-    }
-    
-    /**
-     * 
-     * @return Returns the sendAllSessions.
-     */
-    public boolean isSendAllSessions() {
-        return sendAllSessions;
-    }
-    
-    /**
-     * @param sendAllSessions The sendAllSessions to set.
-     */
-    public void setSendAllSessions(boolean sendAllSessions) {
-        this.sendAllSessions = sendAllSessions;
-    }
-    
-    /**
-     * @return Returns the sendAllSessionsSize.
-     */
-    public int getSendAllSessionsSize() {
-        return sendAllSessionsSize;
-    }
-    
-    /**
-     * @param sendAllSessionsSize The sendAllSessionsSize to set.
-     */
-    public void setSendAllSessionsSize(int sendAllSessionsSize) {
-        this.sendAllSessionsSize = sendAllSessionsSize;
-    }
-    
-    /**
-     * @return Returns the notifySessionListenersOnReplication.
-     */
-    public boolean isNotifySessionListenersOnReplication() {
-        return notifySessionListenersOnReplication;
-    }
-    
-    /**
-     * @param notifyListenersCreateSessionOnReplication The notifySessionListenersOnReplication to set.
-     */
-    public void setNotifySessionListenersOnReplication(boolean notifyListenersCreateSessionOnReplication) {
-        this.notifySessionListenersOnReplication = notifyListenersCreateSessionOnReplication;
-    }
-    
-    
-    public boolean isExpireSessionsOnShutdown() {
-        return expireSessionsOnShutdown;
-    }
-
-    public void setExpireSessionsOnShutdown(boolean expireSessionsOnShutdown) {
-        this.expireSessionsOnShutdown = expireSessionsOnShutdown;
-    }
-    
-    public boolean isNotifyListenersOnReplication() {
-        return notifyListenersOnReplication;
-    }
-
-    public void setNotifyListenersOnReplication(boolean notifyListenersOnReplication) {
-        this.notifyListenersOnReplication = notifyListenersOnReplication;
-    }
-
-    
-    /**
-     * @return Returns the defaultMode.
-     */
-    public boolean isDefaultMode() {
-        return defaultMode;
-    }
-    /**
-     * @param defaultMode The defaultMode to set.
-     */
-    public void setDefaultMode(boolean defaultMode) {
-        this.defaultMode = defaultMode;
-    }
-    
-    public CatalinaCluster getCluster() {
-        return cluster;
-    }
-
-    public void setCluster(CatalinaCluster cluster) {
-        this.cluster = cluster;
-    }
-
-    /**
-     * Set the Container with which this Manager has been associated. If it is a
-     * Context (the usual case), listen for changes to the session timeout
-     * property.
-     * 
-     * @param container
-     *            The associated Container
-     */
-    public void setContainer(Container container) {
-        // De-register from the old Container (if any)
-        if ((this.container != null) && (this.container instanceof Context))
-            ((Context) this.container).removePropertyChangeListener(this);
-
-        // Default processing provided by our superclass
-        super.setContainer(container);
-
-        // Register with the new Container (if any)
-        if ((this.container != null) && (this.container instanceof Context)) {
-            setMaxInactiveInterval(((Context) this.container).getSessionTimeout() * 60);
-            ((Context) this.container).addPropertyChangeListener(this);
-        }
-
-    }
-    
-    // --------------------------------------------------------- Public Methods
-
-    /**
-     * Construct and return a new session object, based on the default settings
-     * specified by this Manager's properties. The session id will be assigned
-     * by this method, and available via the getId() method of the returned
-     * session. If a new session cannot be created for any reason, return
-     * <code>null</code>.
-     * 
-     * @exception IllegalStateException
-     *                if a new session cannot be instantiated for any reason
-     * 
-     * Construct and return a new session object, based on the default settings
-     * specified by this Manager's properties. The session id will be assigned
-     * by this method, and available via the getId() method of the returned
-     * session. If a new session cannot be created for any reason, return
-     * <code>null</code>.
-     * 
-     * @exception IllegalStateException
-     *                if a new session cannot be instantiated for any reason
-     */
-    public Session createSession(String sessionId) {
-        return createSession(sessionId, true);
-    }
-
-    /**
-     * create new session with check maxActiveSessions and send session creation
-     * to other cluster nodes.
-     * 
-     * @param distribute
-     * @return The session
-     */
-    public Session createSession(String sessionId, boolean distribute) {
-        if ((maxActiveSessions >= 0) && (sessions.size() >= maxActiveSessions)) {
-            rejectedSessions++;
-            throw new IllegalStateException(sm.getString("deltaManager.createSession.ise"));
-        }
-        DeltaSession session = (DeltaSession) super.createSession(sessionId) ;
-        if (distribute) {
-            sendCreateSession(session.getId(), session);
-        }
-        if (log.isDebugEnabled())
-            log.debug(sm.getString("deltaManager.createSession.newSession",session.getId(), new Integer(sessions.size())));
-        return (session);
-
-    }
-
-    /**
-     * Send create session evt to all backup node
-     * @param sessionId
-     * @param session
-     */
-    protected void sendCreateSession(String sessionId, DeltaSession session) {
-        if(cluster.getMembers().length > 0 ) {
-            SessionMessage msg = 
-                new SessionMessageImpl(getName(),
-                                       SessionMessage.EVT_SESSION_CREATED, 
-                                       null, 
-                                       sessionId,
-                                       sessionId + "-" + System.currentTimeMillis());
-            if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.sendMessage.newSession",name, sessionId));
-            msg.setTimestamp(session.getCreationTime());
-            counterSend_EVT_SESSION_CREATED++;
-            send(msg);
-        }
-    }
-    
-    /**
-     * Send messages to other backup member (domain or all)
-     * @param msg Session message
-     */
-    protected void send(SessionMessage msg) {
-        if(cluster != null) {
-            if(doDomainReplication())
-                cluster.sendClusterDomain(msg);
-            else
-                cluster.send(msg);
-        }
-    }
-
-    /**
-     * Create DeltaSession
-     * @see org.apache.catalina.Manager#createEmptySession()
-     */
-    public Session createEmptySession() {
-        return getNewDeltaSession() ;
-    }
-    
-    /**
-     * Get new session class to be used in the doLoad() method.
-     */
-    protected DeltaSession getNewDeltaSession() {
-        return new DeltaSession(this);
-    }
-
-    /**
-     * Load Deltarequest from external node
-     * Load the Class at container classloader
-     * @see DeltaRequest#readExternal(java.io.ObjectInput)
-     * @param session
-     * @param data message data
-     * @return The request
-     * @throws ClassNotFoundException
-     * @throws IOException
-     */
-    protected DeltaRequest deserializeDeltaRequest(DeltaSession session, byte[] data) throws ClassNotFoundException, IOException {
-        ReplicationStream ois = getReplicationStream(data);
-        session.getDeltaRequest().readExternal(ois);
-        ois.close();
-        return session.getDeltaRequest();
-    }
-
-    /**
-     * serialize DeltaRequest
-     * @see DeltaRequest#writeExternal(java.io.ObjectOutput)
-     * 
-     * @param deltaRequest
-     * @return serialized delta request
-     * @throws IOException
-     */
-    protected byte[] serializeDeltaRequest(DeltaRequest deltaRequest) throws IOException {
-        return deltaRequest.serialize();
-    }
-
-    /**
-     * Load sessions from other cluster node.
-     * FIXME replace currently sessions with same id without notifcation.
-     * FIXME SSO handling is not really correct with the session replacement!
-     * @exception ClassNotFoundException
-     *                if a serialized class cannot be found during the reload
-     * @exception IOException
-     *                if an input/output error occurs
-     */
-    protected void deserializeSessions(byte[] data) throws ClassNotFoundException,IOException {
-
-        // Initialize our internal data structures
-        //sessions.clear(); //should not do this
-        // Open an input stream to the specified pathname, if any
-        ClassLoader originalLoader = Thread.currentThread().getContextClassLoader();
-        ObjectInputStream ois = null;
-        // Load the previously unloaded active sessions
-        try {
-            ois = getReplicationStream(data);
-            Integer count = (Integer) ois.readObject();
-            int n = count.intValue();
-            for (int i = 0; i < n; i++) {
-                DeltaSession session = (DeltaSession) createEmptySession();
-                session.readObjectData(ois);
-                session.setManager(this);
-                session.setValid(true);
-                session.setPrimarySession(false);
-                //in case the nodes in the cluster are out of
-                //time synch, this will make sure that we have the
-                //correct timestamp, isValid returns true, cause
-                // accessCount=1
-                session.access();
-                //make sure that the session gets ready to expire if
-                // needed
-                session.setAccessCount(0);
-                session.resetDeltaRequest();
-                // FIXME How inform other session id cache like SingleSignOn
-                // increment sessionCounter to correct stats report
-                if (findSession(session.getIdInternal()) == null ) {
-                    sessionCounter++;
-                } else {
-                    sessionReplaceCounter++;
-                    // FIXME better is to grap this sessions again !
-                    if (log.isWarnEnabled()) log.warn(sm.getString("deltaManager.loading.existing.session",session.getIdInternal()));
-                }
-                add(session);
-            }
-        } catch (ClassNotFoundException e) {
-            log.error(sm.getString("deltaManager.loading.cnfe", e), e);
-            throw e;
-        } catch (IOException e) {
-            log.error(sm.getString("deltaManager.loading.ioe", e), e);
-            throw e;
-        } finally {
-            // Close the input stream
-            try {
-                if (ois != null) ois.close();
-            } catch (IOException f) {
-                // ignored
-            }
-            ois = null;
-            if (originalLoader != null) Thread.currentThread().setContextClassLoader(originalLoader);
-        }
-
-    }
-
-    
-
-    /**
-     * Save any currently active sessions in the appropriate persistence
-     * mechanism, if any. If persistence is not supported, this method returns
-     * without doing anything.
-     * 
-     * @exception IOException
-     *                if an input/output error occurs
-     */
-    protected byte[] serializeSessions(Session[] currentSessions) throws IOException {
-
-        // Open an output stream to the specified pathname, if any
-        ByteArrayOutputStream fos = null;
-        ObjectOutputStream oos = null;
-
-        try {
-            fos = new ByteArrayOutputStream();
-            oos = new ObjectOutputStream(new BufferedOutputStream(fos));
-            oos.writeObject(new Integer(currentSessions.length));
-            for(int i=0 ; i < currentSessions.length;i++) {
-                ((DeltaSession)currentSessions[i]).writeObjectData(oos);                
-            }
-            // Flush and close the output stream
-            oos.flush();
-        } catch (IOException e) {
-            log.error(sm.getString("deltaManager.unloading.ioe", e), e);
-            throw e;
-        } finally {
-            if (oos != null) {
-                try {
-                    oos.close();
-                } catch (IOException f) {
-                    ;
-                }
-                oos = null;
-            }
-        }
-        // send object data as byte[]
-        return fos.toByteArray();
-    }
-
-    // ------------------------------------------------------ Lifecycle Methods
-
-    /**
-     * Add a lifecycle event listener to this component.
-     * 
-     * @param listener
-     *            The listener to add
-     */
-    public void addLifecycleListener(LifecycleListener listener) {
-        lifecycle.addLifecycleListener(listener);
-    }
-
-    /**
-     * Get the lifecycle listeners associated with this lifecycle. If this
-     * Lifecycle has no listeners registered, a zero-length array is returned.
-     */
-    public LifecycleListener[] findLifecycleListeners() {
-        return lifecycle.findLifecycleListeners();
-    }
-
-    /**
-     * Remove a lifecycle event listener from this component.
-     * 
-     * @param listener
-     *            The listener to remove
-     */
-    public void removeLifecycleListener(LifecycleListener listener) {
-        lifecycle.removeLifecycleListener(listener);
-    }
-
-    /**
-     * Prepare for the beginning of active use of the public methods of this
-     * component. This method should be called after <code>configure()</code>,
-     * and before any of the public methods of the component are utilized.
-     * 
-     * @exception LifecycleException
-     *                if this component detects a fatal error that prevents this
-     *                component from being used
-     */
-    public void start() throws LifecycleException {
-        if (!initialized) init();
-
-        // Validate and update our current component state
-        if (started) {
-            return;
-        }
-        started = true;
-        lifecycle.fireLifecycleEvent(START_EVENT, null);
-
-        // Force initialization of the random number generator
-        generateSessionId();
-
-        // Load unloaded sessions, if any
-        try {
-            //the channel is already running
-            Cluster cluster = getCluster() ;
-            // stop remove cluster binding
-            //wow, how many nested levels of if statements can we have ;)
-            if(cluster == null) {
-                Container context = getContainer() ;
-                if(context != null && context instanceof Context) {
-                     Container host = context.getParent() ;
-                     if(host != null && host instanceof Host) {
-                         cluster = host.getCluster();
-                         if(cluster != null && cluster instanceof CatalinaCluster) {
-                             setCluster((CatalinaCluster) cluster) ;
-                         } else {
-                             Container engine = host.getParent() ;
-                             if(engine != null && engine instanceof Engine) {
-                                 cluster = engine.getCluster();
-                                 if(cluster != null && cluster instanceof CatalinaCluster) {
-                                     setCluster((CatalinaCluster) cluster) ;
-                                 }
-                             } else {
-                                     cluster = null ;
-                             }
-                         }
-                     }
-                }
-            }
-            if (cluster == null) {
-                log.error(sm.getString("deltaManager.noCluster", getName()));
-                return;
-            } else {
-                if (log.isInfoEnabled()) {
-                    String type = "unknown" ;
-                    if( cluster.getContainer() instanceof Host){
-                        type = "Host" ;
-                    } else if( cluster.getContainer() instanceof Engine){
-                        type = "Engine" ;
-                    }
-                    log.info(sm.getString("deltaManager.registerCluster", getName(), type, cluster.getClusterName()));
-                }
-            }
-            if (log.isInfoEnabled()) log.info(sm.getString("deltaManager.startClustering", getName()));
-            //to survice context reloads, as only a stop/start is called, not
-            // createManager
-            cluster.registerManager(this);
-
-            getAllClusterSessions();
-
-        } catch (Throwable t) {
-            log.error(sm.getString("deltaManager.managerLoad"), t);
-        }
-    }
-
-    /**
-     * get from first session master the backup from all clustered sessions
-     * @see #findSessionMasterMember()
-     */
-    public synchronized void getAllClusterSessions() {
-        if (cluster != null && cluster.getMembers().length > 0) {
-            long beforeSendTime = System.currentTimeMillis();
-            Member mbr = findSessionMasterMember();
-            if(mbr == null) { // No domain member found
-                 return;
-            }
-            SessionMessage msg = new SessionMessageImpl(this.getName(),SessionMessage.EVT_GET_ALL_SESSIONS, null, "GET-ALL","GET-ALL-" + getName());
-            // set reference time
-            stateTransferCreateSendTime = beforeSendTime ;
-            // request session state
-            counterSend_EVT_GET_ALL_SESSIONS++;
-            stateTransfered = false ;
-            // FIXME This send call block the deploy thread, when sender waitForAck is enabled
-            try {
-                synchronized(receivedMessageQueue) {
-                     receiverQueue = true ;
-                }
-                cluster.send(msg, mbr);
-                if (log.isWarnEnabled()) log.warn(sm.getString("deltaManager.waitForSessionState",getName(), mbr));
-                // FIXME At sender ack mode this method check only the state transfer and resend is a problem!
-                waitForSendAllSessions(beforeSendTime);
-            } finally {
-                synchronized(receivedMessageQueue) {
-                    for (Iterator iter = receivedMessageQueue.iterator(); iter.hasNext();) {
-                        SessionMessage smsg = (SessionMessage) iter.next();
-                        if (!stateTimestampDrop) {
-                            messageReceived(smsg, smsg.getAddress() != null ? (Member) smsg.getAddress() : null);
-                        } else {
-                            if (smsg.getEventType() != SessionMessage.EVT_GET_ALL_SESSIONS && smsg.getTimestamp() >= stateTransferCreateSendTime) {
-                                // FIXME handle EVT_GET_ALL_SESSIONS later
-                                messageReceived(smsg,smsg.getAddress() != null ? (Member) smsg.getAddress() : null);
-                            } else {
-                                if (log.isWarnEnabled()) {
-                                    log.warn(sm.getString("deltaManager.dropMessage",getName(), smsg.getEventTypeString(),new Date(stateTransferCreateSendTime), new Date(smsg.getTimestamp())));
-                                }
-                            }
-                        }
-                    }        
-                    receivedMessageQueue.clear();
-                    receiverQueue = false ;
-                }
-           }
-        } else {
-            if (log.isInfoEnabled()) log.info(sm.getString("deltaManager.noMembers", getName()));
-        }
-    }
-
-    /**
-     * Register cross context session at replication valve thread local
-     * @param session cross context session
-     */
-    protected void registerSessionAtReplicationValve(DeltaSession session) {
-        if(replicationValve == null) {
-            if(container instanceof StandardContext && ((StandardContext)container).getCrossContext()) {
-                Cluster cluster = getCluster() ;
-                if(cluster != null && cluster instanceof CatalinaCluster) {
-                    Valve[] valves = ((CatalinaCluster)cluster).getValves();
-                    if(valves != null && valves.length > 0) {
-                        for(int i=0; replicationValve == null && i < valves.length ; i++ ){
-                            if(valves[i] instanceof ReplicationValve) replicationValve = (ReplicationValve)valves[i] ;
-                        }//for
-
-                        if(replicationValve == null && log.isDebugEnabled()) {
-                            log.debug("no ReplicationValve found for CrossContext Support");
-                        }//endif 
-                    }//end if
-                }//endif
-            }//end if
-        }//end if
-        if(replicationValve != null) {
-            replicationValve.registerReplicationSession(session);
-        }
-    }
-    
-    /**
-     * Find the master of the session state
-     * @return master member of sessions 
-     */
-    protected Member findSessionMasterMember() {
-        Member mbr = null;
-        Member mbrs[] = cluster.getMembers();
-        if(mbrs.length != 0 ) mbr = mbrs[0];
-        if(mbr == null && log.isWarnEnabled()) log.warn(sm.getString("deltaManager.noMasterMember",getName(), ""));
-        if(mbr != null && log.isDebugEnabled()) log.warn(sm.getString("deltaManager.foundMasterMember",getName(), mbr));
-        return mbr;
-    }
-
-    /**
-     * Wait that cluster session state is transfer or timeout after 60 Sec
-     * With stateTransferTimeout == -1 wait that backup is transfered (forever mode)
-     */
-    protected void waitForSendAllSessions(long beforeSendTime) {
-        long reqStart = System.currentTimeMillis();
-        long reqNow = reqStart ;
-        boolean isTimeout = false;
-        if(getStateTransferTimeout() > 0) {
-            // wait that state is transfered with timeout check
-            do {
-                try {
-                    Thread.sleep(100);
-                } catch (Exception sleep) {
-                    //
-                }
-                reqNow = System.currentTimeMillis();
-                isTimeout = ((reqNow - reqStart) > (1000 * getStateTransferTimeout()));
-            } while ((!getStateTransfered()) && (!isTimeout));
-        } else {
-            if(getStateTransferTimeout() == -1) {
-                // wait that state is transfered
-                do {
-                    try {
-                        Thread.sleep(100);
-                    } catch (Exception sleep) {
-                    }
-                } while ((!getStateTransfered()));
-                reqNow = System.currentTimeMillis();
-            }
-        }
-        if (isTimeout || (!getStateTransfered())) {
-            counterNoStateTransfered++ ;
-            log.error(sm.getString("deltaManager.noSessionState",getName(),new Date(beforeSendTime),new Long(reqNow - beforeSendTime)));
-        } else {
-            if (log.isInfoEnabled())
-                log.info(sm.getString("deltaManager.sessionReceived",getName(), new Date(beforeSendTime), new Long(reqNow - beforeSendTime)));
-        }
-    }
-
-    /**
-     * Gracefully terminate the active use of the public methods of this
-     * component. This method should be the last one called on a given instance
-     * of this component.
-     * 
-     * @exception LifecycleException
-     *                if this component detects a fatal error that needs to be
-     *                reported
-     */
-    public void stop() throws LifecycleException {
-
-        if (log.isDebugEnabled())
-            log.debug(sm.getString("deltaManager.stopped", getName()));
-
-
-        // Validate and update our current component state
-        if (!started)
-            throw new LifecycleException(sm.getString("deltaManager.notStarted"));
-        lifecycle.fireLifecycleEvent(STOP_EVENT, null);
-        started = false;
-
-        // Expire all active sessions
-        if (log.isInfoEnabled()) log.info(sm.getString("deltaManager.expireSessions", getName()));
-        Session sessions[] = findSessions();
-        for (int i = 0; i < sessions.length; i++) {
-            DeltaSession session = (DeltaSession) sessions[i];
-            if (!session.isValid())
-                continue;
-            try {
-                session.expire(true, isExpireSessionsOnShutdown());
-            } catch (Throwable ignore) {
-                ;
-            } 
-        }
-
-        // Require a new random number generator if we are restarted
-        this.random = null;
-        getCluster().removeManager(this);
-        replicationValve = null;
-        if (initialized) {
-            destroy();
-        }
-    }
-
-    // ----------------------------------------- PropertyChangeListener Methods
-
-    /**
-     * Process property change events from our associated Context.
-     * 
-     * @param event
-     *            The property change event that has occurred
-     */
-    public void propertyChange(PropertyChangeEvent event) {
-
-        // Validate the source of this event
-        if (!(event.getSource() instanceof Context))
-            return;
-        // Process a relevant property change
-        if (event.getPropertyName().equals("sessionTimeout")) {
-            try {
-                setMaxInactiveInterval(((Integer) event.getNewValue()).intValue() * 60);
-            } catch (NumberFormatException e) {
-                log.error(sm.getString("deltaManager.sessionTimeout", event.getNewValue()));
-            }
-        }
-
-    }
-
-    // -------------------------------------------------------- Replication
-    // Methods
-
-    /**
-     * A message was received from another node, this is the callback method to
-     * implement if you are interested in receiving replication messages.
-     * 
-     * @param cmsg -
-     *            the message received.
-     */
-    public void messageDataReceived(ClusterMessage cmsg) {
-        if (cmsg != null && cmsg instanceof SessionMessage) {
-            SessionMessage msg = (SessionMessage) cmsg;
-            switch (msg.getEventType()) {
-                case SessionMessage.EVT_GET_ALL_SESSIONS:
-                case SessionMessage.EVT_SESSION_CREATED: 
-                case SessionMessage.EVT_SESSION_EXPIRED: 
-                case SessionMessage.EVT_SESSION_ACCESSED:
-                case SessionMessage.EVT_SESSION_DELTA: {
-                    synchronized(receivedMessageQueue) {
-                        if(receiverQueue) {
-                            receivedMessageQueue.add(msg);
-                            return ;
-                        }
-                    }
-                   break;
-                }
-                default: {
-                    //we didn't queue, do nothing
-                    break;
-                }
-            } //switch
-            
-            messageReceived(msg, msg.getAddress() != null ? (Member) msg.getAddress() : null);
-        }
-    }
-
-    /**
-     * When the request has been completed, the replication valve will notify
-     * the manager, and the manager will decide whether any replication is
-     * needed or not. If there is a need for replication, the manager will
-     * create a session message and that will be replicated. The cluster
-     * determines where it gets sent.
-     * 
-     * @param sessionId -
-     *            the sessionId that just completed.
-     * @return a SessionMessage to be sent,
-     */
-    public ClusterMessage requestCompleted(String sessionId) {
-        try {
-            DeltaSession session = (DeltaSession) findSession(sessionId);
-            DeltaRequest deltaRequest = session.getDeltaRequest();
-            SessionMessage msg = null;
-            boolean isDeltaRequest = false ;
-            synchronized(deltaRequest) {
-                isDeltaRequest = deltaRequest.getSize() > 0 ;
-                if (isDeltaRequest) {    
-                    counterSend_EVT_SESSION_DELTA++;
-                    byte[] data = serializeDeltaRequest(deltaRequest);
-                    msg = new SessionMessageImpl(getName(),
-                                                 SessionMessage.EVT_SESSION_DELTA, 
-                                                 data, 
-                                                 sessionId,
-                                                 sessionId + "-" + System.currentTimeMillis());
-                    session.resetDeltaRequest();
-                }  
-            }
-            if(!isDeltaRequest) {
-                if(!session.isPrimarySession()) {               
-                    counterSend_EVT_SESSION_ACCESSED++;
-                    msg = new SessionMessageImpl(getName(),
-                                                 SessionMessage.EVT_SESSION_ACCESSED, 
-                                                 null, 
-                                                 sessionId,
-                                                 sessionId + "-" + System.currentTimeMillis());
-                    if (log.isDebugEnabled()) {
-                        log.debug(sm.getString("deltaManager.createMessage.accessChangePrimary",getName(), sessionId));
-                    }
-                }    
-            } else { // log only outside synch block!
-                if (log.isDebugEnabled()) {
-                    log.debug(sm.getString("deltaManager.createMessage.delta",getName(), sessionId));
-                }
-            }
-            session.setPrimarySession(true);
-            //check to see if we need to send out an access message
-            if ((msg == null)) {
-                long replDelta = System.currentTimeMillis() - session.getLastTimeReplicated();
-                if (replDelta > (getMaxInactiveInterval() * 1000)) {
-                    counterSend_EVT_SESSION_ACCESSED++;
-                    msg = new SessionMessageImpl(getName(),
-                                                 SessionMessage.EVT_SESSION_ACCESSED, 
-                                                 null,
-                                                 sessionId, 
-                                                 sessionId + "-" + System.currentTimeMillis());
-                    if (log.isDebugEnabled()) {
-                        log.debug(sm.getString("deltaManager.createMessage.access", getName(),sessionId));
-                    }
-                }
-
-            }
-
-            //update last replicated time
-            if (msg != null) session.setLastTimeReplicated(System.currentTimeMillis());
-            return msg;
-        } catch (IOException x) {
-            log.error(sm.getString("deltaManager.createMessage.unableCreateDeltaRequest",sessionId), x);
-            return null;
-        }
-
-    }
-    /**
-     * Reset manager statistics
-     */
-    public synchronized void resetStatistics() {
-        processingTime = 0 ;
-        expiredSessions = 0 ;
-        rejectedSessions = 0 ;
-        sessionReplaceCounter = 0 ;
-        counterNoStateTransfered = 0 ;
-        maxActive = getActiveSessions() ;
-        sessionCounter = getActiveSessions() ;
-        counterReceive_EVT_ALL_SESSION_DATA = 0;
-        counterReceive_EVT_GET_ALL_SESSIONS = 0;
-        counterReceive_EVT_SESSION_ACCESSED = 0 ;
-        counterReceive_EVT_SESSION_CREATED = 0 ;
-        counterReceive_EVT_SESSION_DELTA = 0 ;
-        counterReceive_EVT_SESSION_EXPIRED = 0 ;
-        counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0;
-        counterSend_EVT_ALL_SESSION_DATA = 0;
-        counterSend_EVT_GET_ALL_SESSIONS = 0;
-        counterSend_EVT_SESSION_ACCESSED = 0 ;
-        counterSend_EVT_SESSION_CREATED = 0 ;
-        counterSend_EVT_SESSION_DELTA = 0 ;
-        counterSend_EVT_SESSION_EXPIRED = 0 ;
-        counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0;
-        
-    }
-   
-    //  -------------------------------------------------------- persistence handler
-
-    public void load() {
-
-    }
-
-    public void unload() {
-
-    }
-
-    //  -------------------------------------------------------- expire
-
-    /**
-     * send session expired to other cluster nodes
-     * 
-     * @param id
-     *            session id
-     */
-    protected void sessionExpired(String id) {
-        counterSend_EVT_SESSION_EXPIRED++ ;
-        SessionMessage msg = new SessionMessageImpl(getName(),SessionMessage.EVT_SESSION_EXPIRED, null, id, id+ "-EXPIRED-MSG");
-        if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.createMessage.expire",getName(), id));
-        send(msg);
-    }
-
-    /**
-     * Exipre all find sessions.
-     */
-    public void expireAllLocalSessions()
-    {
-        long timeNow = System.currentTimeMillis();
-        Session sessions[] = findSessions();
-        int expireDirect  = 0 ;
-        int expireIndirect = 0 ;
-        
-        if(log.isDebugEnabled()) log.debug("Start expire all sessions " + getName() + " at " + timeNow + " sessioncount " + sessions.length);
-        for (int i = 0; i < sessions.length; i++) {
-            if (sessions[i] instanceof DeltaSession) {
-                DeltaSession session = (DeltaSession) sessions[i];
-                if (session.isPrimarySession()) {
-                    if (session.isValid()) {
-                        session.expire();
-                        expireDirect++;
-                    } else {
-                        expireIndirect++;
-                    }//end if
-                }//end if
-            }//end if
-        }//for
-        long timeEnd = System.currentTimeMillis();
-        if(log.isDebugEnabled()) log.debug("End expire sessions " + getName() + " exipre processingTime " + (timeEnd - timeNow) + " expired direct sessions: " + expireDirect + " expired direct sessions: " + expireIndirect);
-      
-    }
-    
-    /**
-     * When the manager expires session not tied to a request. The cluster will
-     * periodically ask for a list of sessions that should expire and that
-     * should be sent across the wire.
-     * 
-     * @return The invalidated sessions array
-     */
-    public String[] getInvalidatedSessions() {
-        return new String[0];
-    }
-
-    //  -------------------------------------------------------- message receive
-
-    /**
-     * Test that sender and local domain is the same
-     */
-    protected boolean checkSenderDomain(SessionMessage msg,Member sender) {
-        boolean sameDomain= true;
-        if (!sameDomain && log.isWarnEnabled()) {
-                log.warn(sm.getString("deltaManager.receiveMessage.fromWrongDomain",
-                         new Object[] {getName(), 
-                         msg.getEventTypeString(), 
-                         sender,
-                         "",
-                         "" }));
-        }
-        return sameDomain ;
-    }
-
-    /**
-     * This method is called by the received thread when a SessionMessage has
-     * been received from one of the other nodes in the cluster.
-     * 
-     * @param msg -
-     *            the message received
-     * @param sender -
-     *            the sender of the message, this is used if we receive a
-     *            EVT_GET_ALL_SESSION message, so that we only reply to the
-     *            requesting node
-     */
-    protected void messageReceived(SessionMessage msg, Member sender) {
-        if(doDomainReplication() && !checkSenderDomain(msg,sender)) {
-            return;
-        }
-        ClassLoader contextLoader = Thread.currentThread().getContextClassLoader();
-        try {
-            
-            ClassLoader[] loaders = getClassLoaders();
-            if ( loaders != null && loaders.length > 0) Thread.currentThread().setContextClassLoader(loaders[0]);
-            if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.eventType",getName(), msg.getEventTypeString(), sender));
- 
-            switch (msg.getEventType()) {
-                case SessionMessage.EVT_GET_ALL_SESSIONS: {
-                    handleGET_ALL_SESSIONS(msg,sender);
-                    break;
-                }
-                case SessionMessage.EVT_ALL_SESSION_DATA: {
-                    handleALL_SESSION_DATA(msg,sender);
-                    break;
-                }
-                case SessionMessage.EVT_ALL_SESSION_TRANSFERCOMPLETE: {
-                    handleALL_SESSION_TRANSFERCOMPLETE(msg,sender);
-                    break;
-                }
-                case SessionMessage.EVT_SESSION_CREATED: {
-                    handleSESSION_CREATED(msg,sender);
-                    break;
-                }
-                case SessionMessage.EVT_SESSION_EXPIRED: {
-                    handleSESSION_EXPIRED(msg,sender);
-                    break;
-                }
-                case SessionMessage.EVT_SESSION_ACCESSED: {
-                    handleSESSION_ACCESSED(msg,sender);
-                    break;
-                }
-                case SessionMessage.EVT_SESSION_DELTA: {
-                   handleSESSION_DELTA(msg,sender);
-                   break;
-                }
-                default: {
-                    //we didn't recognize the message type, do nothing
-                    break;
-                }
-            } //switch
-        } catch (Exception x) {
-            log.error(sm.getString("deltaManager.receiveMessage.error",getName()), x);
-        } finally {
-            Thread.currentThread().setContextClassLoader(contextLoader);
-        }
-    }
-
-    // -------------------------------------------------------- message receiver handler
-
-
-    /**
-     * handle receive session state is complete transfered
-     * @param msg
-     * @param sender
-     */
-    protected void handleALL_SESSION_TRANSFERCOMPLETE(SessionMessage msg, Member sender) {
-        counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE++ ;
-        if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.transfercomplete",getName(), sender.getHost(), new Integer(sender.getPort())));
-        stateTransferCreateSendTime = msg.getTimestamp() ;
-        stateTransfered = true ;
-    }
-
-    /**
-     * handle receive session delta
-     * @param msg
-     * @param sender
-     * @throws IOException
-     * @throws ClassNotFoundException
-     */
-    protected void handleSESSION_DELTA(SessionMessage msg, Member sender) throws IOException, ClassNotFoundException {
-        counterReceive_EVT_SESSION_DELTA++;
-        byte[] delta = msg.getSession();
-        DeltaSession session = (DeltaSession) findSession(msg.getSessionID());
-        if (session != null) {
-            if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.delta",getName(), msg.getSessionID()));
-            DeltaRequest dreq = deserializeDeltaRequest(session, delta);
-            dreq.execute(session, notifyListenersOnReplication);
-            session.setPrimarySession(false);
-        }
-    }
-
-    /**
-     * handle receive session is access at other node ( primary session is now false)
-     * @param msg
-     * @param sender
-     * @throws IOException
-     */
-    protected void handleSESSION_ACCESSED(SessionMessage msg,Member sender) throws IOException {
-        counterReceive_EVT_SESSION_ACCESSED++;
-        DeltaSession session = (DeltaSession) findSession(msg.getSessionID());
-        if (session != null) {
-            if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.accessed",getName(), msg.getSessionID()));
-            session.access();
-            session.setPrimarySession(false);
-            session.endAccess();
-        }
-    }
-
-    /**
-     * handle receive session is expire at other node ( expire session also here)
-     * @param msg
-     * @param sender
-     * @throws IOException
-     */
-    protected void handleSESSION_EXPIRED(SessionMessage msg,Member sender) throws IOException {
-        counterReceive_EVT_SESSION_EXPIRED++;
-        DeltaSession session = (DeltaSession) findSession(msg.getSessionID());
-        if (session != null) {
-            if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.expired",getName(), msg.getSessionID()));
-            session.expire(notifySessionListenersOnReplication, false);
-        }
-    }
-
-    /**
-     * handle receive new session is created at other node (create backup - primary false)
-     * @param msg
-     * @param sender
-     */
-    protected void handleSESSION_CREATED(SessionMessage msg,Member sender) {
-        counterReceive_EVT_SESSION_CREATED++;
-        if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.createNewSession",getName(), msg.getSessionID()));
-        DeltaSession session = (DeltaSession) createEmptySession();
-        session.setManager(this);
-        session.setValid(true);
-        session.setPrimarySession(false);
-        session.setCreationTime(msg.getTimestamp());
-        session.access();
-        if(notifySessionListenersOnReplication)
-            session.setId(msg.getSessionID());
-        else
-            session.setIdInternal(msg.getSessionID());
-        session.resetDeltaRequest();
-        session.endAccess();
-
-    }
-
-    /**
-     * handle receive sessions from other not ( restart )
-     * @param msg
-     * @param sender
-     * @throws ClassNotFoundException
-     * @throws IOException
-     */
-    protected void handleALL_SESSION_DATA(SessionMessage msg,Member sender) throws ClassNotFoundException, IOException {
-        counterReceive_EVT_ALL_SESSION_DATA++;
-        if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.allSessionDataBegin",getName()));
-        byte[] data = msg.getSession();
-        deserializeSessions(data);
-        if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.allSessionDataAfter",getName()));
-        //stateTransferred = true;
-    }
-
-    /**
-     * handle receive that other node want all sessions ( restart )
-     * a) send all sessions with one message
-     * b) send session at blocks
-     * After sending send state is complete transfered
-     * @param msg
-     * @param sender
-     * @throws IOException
-     */
-    protected void handleGET_ALL_SESSIONS(SessionMessage msg, Member sender) throws IOException {
-        counterReceive_EVT_GET_ALL_SESSIONS++;
-        //get a list of all the session from this manager
-        if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.unloadingBegin", getName()));
-        // Write the number of active sessions, followed by the details
-        // get all sessions and serialize without sync
-        Session[] currentSessions = findSessions();
-        long findSessionTimestamp = System.currentTimeMillis() ;
-        if (isSendAllSessions()) {
-            sendSessions(sender, currentSessions, findSessionTimestamp);
-        } else {
-            // send session at blocks
-            int len = currentSessions.length < getSendAllSessionsSize() ? currentSessions.length : getSendAllSessionsSize();
-            Session[] sendSessions = new Session[len];
-            for (int i = 0; i < currentSessions.length; i += getSendAllSessionsSize()) {
-                len = i + getSendAllSessionsSize() > currentSessions.length ? currentSessions.length - i : getSendAllSessionsSize();
-                System.arraycopy(currentSessions, i, sendSessions, 0, len);
-                sendSessions(sender, sendSessions,findSessionTimestamp);
-                if (getSendAllSessionsWaitTime() > 0) {
-                    try {
-                        Thread.sleep(getSendAllSessionsWaitTime());
-                    } catch (Exception sleep) {
-                    }
-                }//end if
-            }//for
-        }//end if
-        
-        SessionMessage newmsg = new SessionMessageImpl(name,SessionMessage.EVT_ALL_SESSION_TRANSFERCOMPLETE, null,"SESSION-STATE-TRANSFERED", "SESSION-STATE-TRANSFERED"+ getName());
-        newmsg.setTimestamp(findSessionTimestamp);
-        if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.createMessage.allSessionTransfered",getName()));
-        counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE++;
-        cluster.send(newmsg, sender);
-    }
-
-
-    /**
-     * send a block of session to sender
-     * @param sender
-     * @param currentSessions
-     * @param sendTimestamp
-     * @throws IOException
-     */
-    protected void sendSessions(Member sender, Session[] currentSessions,long sendTimestamp) throws IOException {
-        byte[] data = serializeSessions(currentSessions);
-        if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.unloadingAfter",getName()));
-        SessionMessage newmsg = new SessionMessageImpl(name,SessionMessage.EVT_ALL_SESSION_DATA, data,"SESSION-STATE", "SESSION-STATE-" + getName());
-        newmsg.setTimestamp(sendTimestamp);
-        if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.createMessage.allSessionData",getName()));
-        counterSend_EVT_ALL_SESSION_DATA++;
-        cluster.send(newmsg, sender);
-    }
-
-    public ClusterManager cloneFromTemplate() {
-        DeltaManager result = new DeltaManager();
-        result.name = "Clone-from-"+name;
-        result.cluster = cluster;
-        result.replicationValve = replicationValve;
-        result.maxActiveSessions = maxActiveSessions;
-        result.expireSessionsOnShutdown = expireSessionsOnShutdown;
-        result.notifyListenersOnReplication = notifyListenersOnReplication;
-        result.notifySessionListenersOnReplication = notifySessionListenersOnReplication;
-        result.stateTransferTimeout = stateTransferTimeout;
-        result.sendAllSessions = sendAllSessions;
-        result.sendClusterDomainOnly = sendClusterDomainOnly ;
-        result.sendAllSessionsSize = sendAllSessionsSize;
-        result.sendAllSessionsWaitTime = sendAllSessionsWaitTime ; 
-        result.receiverQueue = receiverQueue ;
-        result.stateTimestampDrop = stateTimestampDrop ;
-        result.stateTransferCreateSendTime = stateTransferCreateSendTime; 
-        return result;
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.ha.session;
+
+import java.beans.PropertyChangeEvent;
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.Iterator;
+
+import org.apache.catalina.Cluster;
+import org.apache.catalina.Container;
+import org.apache.catalina.Context;
+import org.apache.catalina.Engine;
+import org.apache.catalina.Host;
+import org.apache.catalina.LifecycleException;
+import org.apache.catalina.LifecycleListener;
+import org.apache.catalina.Session;
+import org.apache.catalina.Valve;
+import org.apache.catalina.core.StandardContext;
+import org.apache.catalina.ha.CatalinaCluster;
+import org.apache.catalina.ha.ClusterMessage;
+import org.apache.catalina.ha.tcp.ReplicationValve;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.io.ReplicationStream;
+import org.apache.catalina.util.LifecycleSupport;
+import org.apache.catalina.util.StringManager;
+import org.apache.catalina.ha.ClusterManager;
+
+/**
+ * The DeltaManager manages replicated sessions by only replicating the deltas
+ * in data. For applications written to handle this, the DeltaManager is the
+ * optimal way of replicating data.
+ * 
+ * This code is almost identical to StandardManager with a difference in how it
+ * persists sessions and some modifications to it.
+ * 
+ * <b>IMPLEMENTATION NOTE </b>: Correct behavior of session storing and
+ * reloading depends upon external calls to the <code>start()</code> and
+ * <code>stop()</code> methods of this class at the correct times.
+ * 
+ * @author Filip Hanik
+ * @author Craig R. McClanahan
+ * @author Jean-Francois Arcand
+ * @author Peter Rossbach
+ * @version $Revision$ $Date$
+ */
+
+public class DeltaManager extends ClusterManagerBase{
+
+    // ---------------------------------------------------- Security Classes
+    public static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog(DeltaManager.class);
+
+    /**
+     * The string manager for this package.
+     */
+    protected static StringManager sm = StringManager.getManager(Constants.Package);
+
+    // ----------------------------------------------------- Instance Variables
+
+    /**
+     * The descriptive information about this implementation.
+     */
+    private static final String info = "DeltaManager/2.1";
+
+    /**
+     * Has this component been started yet?
+     */
+    private boolean started = false;
+
+    /**
+     * The descriptive name of this Manager implementation (for logging).
+     */
+    protected static String managerName = "DeltaManager";
+    protected String name = null;
+    protected boolean defaultMode = false;
+    private CatalinaCluster cluster = null;
+
+    /**
+     * cached replication valve cluster container!
+     */
+    private ReplicationValve replicationValve = null ;
+    
+    /**
+     * The lifecycle event support for this component.
+     */
+    protected LifecycleSupport lifecycle = new LifecycleSupport(this);
+
+    /**
+     * The maximum number of active Sessions allowed, or -1 for no limit.
+     */
+    private int maxActiveSessions = -1;
+    private boolean expireSessionsOnShutdown = false;
+    private boolean notifyListenersOnReplication = true;
+    private boolean notifySessionListenersOnReplication = true;
+    private boolean stateTransfered = false ;
+    private int stateTransferTimeout = 60;
+    private boolean sendAllSessions = true;
+    private boolean sendClusterDomainOnly = true ;
+    private int sendAllSessionsSize = 1000 ;
+    
+    /**
+     * wait time between send session block (default 2 sec) 
+     */
+    private int sendAllSessionsWaitTime = 2 * 1000 ; 
+    private ArrayList receivedMessageQueue = new ArrayList() ;
+    private boolean receiverQueue = false ;
+    private boolean stateTimestampDrop = true ;
+    private long stateTransferCreateSendTime; 
+    
+    // ------------------------------------------------------------------ stats attributes
+    
+    int rejectedSessions = 0;
+    private long sessionReplaceCounter = 0 ;
+    long processingTime = 0;
+    private long counterReceive_EVT_GET_ALL_SESSIONS = 0 ;
+    private long counterSend_EVT_ALL_SESSION_DATA = 0 ;
+    private long counterReceive_EVT_ALL_SESSION_DATA = 0 ;
+    private long counterReceive_EVT_SESSION_CREATED = 0 ;
+    private long counterReceive_EVT_SESSION_EXPIRED = 0;
+    private long counterReceive_EVT_SESSION_ACCESSED = 0 ;
+    private long counterReceive_EVT_SESSION_DELTA = 0;
+    private long counterSend_EVT_GET_ALL_SESSIONS = 0 ;
+    private long counterSend_EVT_SESSION_CREATED = 0;
+    private long counterSend_EVT_SESSION_DELTA = 0 ;
+    private long counterSend_EVT_SESSION_ACCESSED = 0;
+    private long counterSend_EVT_SESSION_EXPIRED = 0;
+    private int counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0 ;
+    private int counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0 ;
+    private int counterNoStateTransfered = 0 ;
+
+
+    // ------------------------------------------------------------- Constructor
+    public DeltaManager() {
+        super();
+    }
+
+    // ------------------------------------------------------------- Properties
+    
+    /**
+     * Return descriptive information about this Manager implementation and the
+     * corresponding version number, in the format
+     * <code>&lt;description&gt;/&lt;version&gt;</code>.
+     */
+    public String getInfo() {
+        return info;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    /**
+     * Return the descriptive short name of this Manager implementation.
+     */
+    public String getName() {
+        return name;
+    }
+
+    /**
+     * @return Returns the counterSend_EVT_GET_ALL_SESSIONS.
+     */
+    public long getCounterSend_EVT_GET_ALL_SESSIONS() {
+        return counterSend_EVT_GET_ALL_SESSIONS;
+    }
+    
+    /**
+     * @return Returns the counterSend_EVT_SESSION_ACCESSED.
+     */
+    public long getCounterSend_EVT_SESSION_ACCESSED() {
+        return counterSend_EVT_SESSION_ACCESSED;
+    }
+    
+    /**
+     * @return Returns the counterSend_EVT_SESSION_CREATED.
+     */
+    public long getCounterSend_EVT_SESSION_CREATED() {
+        return counterSend_EVT_SESSION_CREATED;
+    }
+
+    /**
+     * @return Returns the counterSend_EVT_SESSION_DELTA.
+     */
+    public long getCounterSend_EVT_SESSION_DELTA() {
+        return counterSend_EVT_SESSION_DELTA;
+    }
+
+    /**
+     * @return Returns the counterSend_EVT_SESSION_EXPIRED.
+     */
+    public long getCounterSend_EVT_SESSION_EXPIRED() {
+        return counterSend_EVT_SESSION_EXPIRED;
+    }
+ 
+    /**
+     * @return Returns the counterSend_EVT_ALL_SESSION_DATA.
+     */
+    public long getCounterSend_EVT_ALL_SESSION_DATA() {
+        return counterSend_EVT_ALL_SESSION_DATA;
+    }
+
+    /**
+     * @return Returns the counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE.
+     */
+    public int getCounterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE() {
+        return counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE;
+    }
+ 
+    /**
+     * @return Returns the counterReceive_EVT_ALL_SESSION_DATA.
+     */
+    public long getCounterReceive_EVT_ALL_SESSION_DATA() {
+        return counterReceive_EVT_ALL_SESSION_DATA;
+    }
+    
+    /**
+     * @return Returns the counterReceive_EVT_GET_ALL_SESSIONS.
+     */
+    public long getCounterReceive_EVT_GET_ALL_SESSIONS() {
+        return counterReceive_EVT_GET_ALL_SESSIONS;
+    }
+    
+    /**
+     * @return Returns the counterReceive_EVT_SESSION_ACCESSED.
+     */
+    public long getCounterReceive_EVT_SESSION_ACCESSED() {
+        return counterReceive_EVT_SESSION_ACCESSED;
+    }
+    
+    /**
+     * @return Returns the counterReceive_EVT_SESSION_CREATED.
+     */
+    public long getCounterReceive_EVT_SESSION_CREATED() {
+        return counterReceive_EVT_SESSION_CREATED;
+    }
+    
+    /**
+     * @return Returns the counterReceive_EVT_SESSION_DELTA.
+     */
+    public long getCounterReceive_EVT_SESSION_DELTA() {
+        return counterReceive_EVT_SESSION_DELTA;
+    }
+    
+    /**
+     * @return Returns the counterReceive_EVT_SESSION_EXPIRED.
+     */
+    public long getCounterReceive_EVT_SESSION_EXPIRED() {
+        return counterReceive_EVT_SESSION_EXPIRED;
+    }
+    
+    
+    /**
+     * @return Returns the counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE.
+     */
+    public int getCounterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE() {
+        return counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE;
+    }
+    
+    /**
+     * @return Returns the processingTime.
+     */
+    public long getProcessingTime() {
+        return processingTime;
+    }
+ 
+    /**
+     * @return Returns the sessionReplaceCounter.
+     */
+    public long getSessionReplaceCounter() {
+        return sessionReplaceCounter;
+    }
+    
+    /**
+     * Number of session creations that failed due to maxActiveSessions
+     * 
+     * @return The count
+     */
+    public int getRejectedSessions() {
+        return rejectedSessions;
+    }
+
+    public void setRejectedSessions(int rejectedSessions) {
+        this.rejectedSessions = rejectedSessions;
+    }
+
+    /**
+     * @return Returns the counterNoStateTransfered.
+     */
+    public int getCounterNoStateTransfered() {
+        return counterNoStateTransfered;
+    }
+    
+    public int getReceivedQueueSize() {
+        return receivedMessageQueue.size() ;
+    }
+    
+    /**
+     * @return Returns the stateTransferTimeout.
+     */
+    public int getStateTransferTimeout() {
+        return stateTransferTimeout;
+    }
+    /**
+     * @param timeoutAllSession The timeout
+     */
+    public void setStateTransferTimeout(int timeoutAllSession) {
+        this.stateTransferTimeout = timeoutAllSession;
+    }
+
+    /**
+     * is session state transfered complete?
+     * 
+     */
+    public boolean getStateTransfered() {
+        return stateTransfered;
+    }
+
+    /**
+     * set that state ist complete transfered  
+     * @param stateTransfered
+     */
+    public void setStateTransfered(boolean stateTransfered) {
+        this.stateTransfered = stateTransfered;
+    }
+    
+    /**
+     * @return Returns the sendAllSessionsWaitTime in msec
+     */
+    public int getSendAllSessionsWaitTime() {
+        return sendAllSessionsWaitTime;
+    }
+    
+    /**
+     * @param sendAllSessionsWaitTime The sendAllSessionsWaitTime to set at msec.
+     */
+    public void setSendAllSessionsWaitTime(int sendAllSessionsWaitTime) {
+        this.sendAllSessionsWaitTime = sendAllSessionsWaitTime;
+    }
+    
+    /**
+     * @return Returns the sendClusterDomainOnly.
+     */
+    public boolean doDomainReplication() {
+        return sendClusterDomainOnly;
+    }
+    
+    /**
+     * @param sendClusterDomainOnly The sendClusterDomainOnly to set.
+     */
+    public void setDomainReplication(boolean sendClusterDomainOnly) {
+        this.sendClusterDomainOnly = sendClusterDomainOnly;
+    }
+
+    /**
+     * @return Returns the stateTimestampDrop.
+     */
+    public boolean isStateTimestampDrop() {
+        return stateTimestampDrop;
+    }
+    
+    /**
+     * @param isTimestampDrop The new flag value
+     */
+    public void setStateTimestampDrop(boolean isTimestampDrop) {
+        this.stateTimestampDrop = isTimestampDrop;
+    }
+    
+    /**
+     * Return the maximum number of active Sessions allowed, or -1 for no limit.
+     */
+    public int getMaxActiveSessions() {
+        return (this.maxActiveSessions);
+    }
+
+    /**
+     * Set the maximum number of actives Sessions allowed, or -1 for no limit.
+     * 
+     * @param max
+     *            The new maximum number of sessions
+     */
+    public void setMaxActiveSessions(int max) {
+        int oldMaxActiveSessions = this.maxActiveSessions;
+        this.maxActiveSessions = max;
+        support.firePropertyChange("maxActiveSessions", new Integer(oldMaxActiveSessions), new Integer(this.maxActiveSessions));
+    }
+    
+    /**
+     * 
+     * @return Returns the sendAllSessions.
+     */
+    public boolean isSendAllSessions() {
+        return sendAllSessions;
+    }
+    
+    /**
+     * @param sendAllSessions The sendAllSessions to set.
+     */
+    public void setSendAllSessions(boolean sendAllSessions) {
+        this.sendAllSessions = sendAllSessions;
+    }
+    
+    /**
+     * @return Returns the sendAllSessionsSize.
+     */
+    public int getSendAllSessionsSize() {
+        return sendAllSessionsSize;
+    }
+    
+    /**
+     * @param sendAllSessionsSize The sendAllSessionsSize to set.
+     */
+    public void setSendAllSessionsSize(int sendAllSessionsSize) {
+        this.sendAllSessionsSize = sendAllSessionsSize;
+    }
+    
+    /**
+     * @return Returns the notifySessionListenersOnReplication.
+     */
+    public boolean isNotifySessionListenersOnReplication() {
+        return notifySessionListenersOnReplication;
+    }
+    
+    /**
+     * @param notifyListenersCreateSessionOnReplication The notifySessionListenersOnReplication to set.
+     */
+    public void setNotifySessionListenersOnReplication(boolean notifyListenersCreateSessionOnReplication) {
+        this.notifySessionListenersOnReplication = notifyListenersCreateSessionOnReplication;
+    }
+    
+    
+    public boolean isExpireSessionsOnShutdown() {
+        return expireSessionsOnShutdown;
+    }
+
+    public void setExpireSessionsOnShutdown(boolean expireSessionsOnShutdown) {
+        this.expireSessionsOnShutdown = expireSessionsOnShutdown;
+    }
+    
+    public boolean isNotifyListenersOnReplication() {
+        return notifyListenersOnReplication;
+    }
+
+    public void setNotifyListenersOnReplication(boolean notifyListenersOnReplication) {
+        this.notifyListenersOnReplication = notifyListenersOnReplication;
+    }
+
+    
+    /**
+     * @return Returns the defaultMode.
+     */
+    public boolean isDefaultMode() {
+        return defaultMode;
+    }
+    /**
+     * @param defaultMode The defaultMode to set.
+     */
+    public void setDefaultMode(boolean defaultMode) {
+        this.defaultMode = defaultMode;
+    }
+    
+    public CatalinaCluster getCluster() {
+        return cluster;
+    }
+
+    public void setCluster(CatalinaCluster cluster) {
+        this.cluster = cluster;
+    }
+
+    /**
+     * Set the Container with which this Manager has been associated. If it is a
+     * Context (the usual case), listen for changes to the session timeout
+     * property.
+     * 
+     * @param container
+     *            The associated Container
+     */
+    public void setContainer(Container container) {
+        // De-register from the old Container (if any)
+        if ((this.container != null) && (this.container instanceof Context))
+            ((Context) this.container).removePropertyChangeListener(this);
+
+        // Default processing provided by our superclass
+        super.setContainer(container);
+
+        // Register with the new Container (if any)
+        if ((this.container != null) && (this.container instanceof Context)) {
+            setMaxInactiveInterval(((Context) this.container).getSessionTimeout() * 60);
+            ((Context) this.container).addPropertyChangeListener(this);
+        }
+
+    }
+    
+    // --------------------------------------------------------- Public Methods
+
+    /**
+     * Construct and return a new session object, based on the default settings
+     * specified by this Manager's properties. The session id will be assigned
+     * by this method, and available via the getId() method of the returned
+     * session. If a new session cannot be created for any reason, return
+     * <code>null</code>.
+     * 
+     * @exception IllegalStateException
+     *                if a new session cannot be instantiated for any reason
+     * 
+     * Construct and return a new session object, based on the default settings
+     * specified by this Manager's properties. The session id will be assigned
+     * by this method, and available via the getId() method of the returned
+     * session. If a new session cannot be created for any reason, return
+     * <code>null</code>.
+     * 
+     * @exception IllegalStateException
+     *                if a new session cannot be instantiated for any reason
+     */
+    public Session createSession(String sessionId) {
+        return createSession(sessionId, true);
+    }
+
+    /**
+     * create new session with check maxActiveSessions and send session creation
+     * to other cluster nodes.
+     * 
+     * @param distribute
+     * @return The session
+     */
+    public Session createSession(String sessionId, boolean distribute) {
+        if ((maxActiveSessions >= 0) && (sessions.size() >= maxActiveSessions)) {
+            rejectedSessions++;
+            throw new IllegalStateException(sm.getString("deltaManager.createSession.ise"));
+        }
+        DeltaSession session = (DeltaSession) super.createSession(sessionId) ;
+        if (distribute) {
+            sendCreateSession(session.getId(), session);
+        }
+        if (log.isDebugEnabled())
+            log.debug(sm.getString("deltaManager.createSession.newSession",session.getId(), new Integer(sessions.size())));
+        return (session);
+
+    }
+
+    /**
+     * Send create session evt to all backup node
+     * @param sessionId
+     * @param session
+     */
+    protected void sendCreateSession(String sessionId, DeltaSession session) {
+        if(cluster.getMembers().length > 0 ) {
+            SessionMessage msg = 
+                new SessionMessageImpl(getName(),
+                                       SessionMessage.EVT_SESSION_CREATED, 
+                                       null, 
+                                       sessionId,
+                                       sessionId + "-" + System.currentTimeMillis());
+            if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.sendMessage.newSession",name, sessionId));
+            msg.setTimestamp(session.getCreationTime());
+            counterSend_EVT_SESSION_CREATED++;
+            send(msg);
+        }
+    }
+    
+    /**
+     * Send messages to other backup member (domain or all)
+     * @param msg Session message
+     */
+    protected void send(SessionMessage msg) {
+        if(cluster != null) {
+            if(doDomainReplication())
+                cluster.sendClusterDomain(msg);
+            else
+                cluster.send(msg);
+        }
+    }
+
+    /**
+     * Create DeltaSession
+     * @see org.apache.catalina.Manager#createEmptySession()
+     */
+    public Session createEmptySession() {
+        return getNewDeltaSession() ;
+    }
+    
+    /**
+     * Get new session class to be used in the doLoad() method.
+     */
+    protected DeltaSession getNewDeltaSession() {
+        return new DeltaSession(this);
+    }
+
+    /**
+     * Load Deltarequest from external node
+     * Load the Class at container classloader
+     * @see DeltaRequest#readExternal(java.io.ObjectInput)
+     * @param session
+     * @param data message data
+     * @return The request
+     * @throws ClassNotFoundException
+     * @throws IOException
+     */
+    protected DeltaRequest deserializeDeltaRequest(DeltaSession session, byte[] data) throws ClassNotFoundException, IOException {
+        ReplicationStream ois = getReplicationStream(data);
+        session.getDeltaRequest().readExternal(ois);
+        ois.close();
+        return session.getDeltaRequest();
+    }
+
+    /**
+     * serialize DeltaRequest
+     * @see DeltaRequest#writeExternal(java.io.ObjectOutput)
+     * 
+     * @param deltaRequest
+     * @return serialized delta request
+     * @throws IOException
+     */
+    protected byte[] serializeDeltaRequest(DeltaRequest deltaRequest) throws IOException {
+        return deltaRequest.serialize();
+    }
+
+    /**
+     * Load sessions from other cluster node.
+     * FIXME replace currently sessions with same id without notifcation.
+     * FIXME SSO handling is not really correct with the session replacement!
+     * @exception ClassNotFoundException
+     *                if a serialized class cannot be found during the reload
+     * @exception IOException
+     *                if an input/output error occurs
+     */
+    protected void deserializeSessions(byte[] data) throws ClassNotFoundException,IOException {
+
+        // Initialize our internal data structures
+        //sessions.clear(); //should not do this
+        // Open an input stream to the specified pathname, if any
+        ClassLoader originalLoader = Thread.currentThread().getContextClassLoader();
+        ObjectInputStream ois = null;
+        // Load the previously unloaded active sessions
+        try {
+            ois = getReplicationStream(data);
+            Integer count = (Integer) ois.readObject();
+            int n = count.intValue();
+            for (int i = 0; i < n; i++) {
+                DeltaSession session = (DeltaSession) createEmptySession();
+                session.readObjectData(ois);
+                session.setManager(this);
+                session.setValid(true);
+                session.setPrimarySession(false);
+                //in case the nodes in the cluster are out of
+                //time synch, this will make sure that we have the
+                //correct timestamp, isValid returns true, cause
+                // accessCount=1
+                session.access();
+                //make sure that the session gets ready to expire if
+                // needed
+                session.setAccessCount(0);
+                session.resetDeltaRequest();
+                // FIXME How inform other session id cache like SingleSignOn
+                // increment sessionCounter to correct stats report
+                if (findSession(session.getIdInternal()) == null ) {
+                    sessionCounter++;
+                } else {
+                    sessionReplaceCounter++;
+                    // FIXME better is to grap this sessions again !
+                    if (log.isWarnEnabled()) log.warn(sm.getString("deltaManager.loading.existing.session",session.getIdInternal()));
+                }
+                add(session);
+            }
+        } catch (ClassNotFoundException e) {
+            log.error(sm.getString("deltaManager.loading.cnfe", e), e);
+            throw e;
+        } catch (IOException e) {
+            log.error(sm.getString("deltaManager.loading.ioe", e), e);
+            throw e;
+        } finally {
+            // Close the input stream
+            try {
+                if (ois != null) ois.close();
+            } catch (IOException f) {
+                // ignored
+            }
+            ois = null;
+            if (originalLoader != null) Thread.currentThread().setContextClassLoader(originalLoader);
+        }
+
+    }
+
+    
+
+    /**
+     * Save any currently active sessions in the appropriate persistence
+     * mechanism, if any. If persistence is not supported, this method returns
+     * without doing anything.
+     * 
+     * @exception IOException
+     *                if an input/output error occurs
+     */
+    protected byte[] serializeSessions(Session[] currentSessions) throws IOException {
+
+        // Open an output stream to the specified pathname, if any
+        ByteArrayOutputStream fos = null;
+        ObjectOutputStream oos = null;
+
+        try {
+            fos = new ByteArrayOutputStream();
+            oos = new ObjectOutputStream(new BufferedOutputStream(fos));
+            oos.writeObject(new Integer(currentSessions.length));
+            for(int i=0 ; i < currentSessions.length;i++) {
+                ((DeltaSession)currentSessions[i]).writeObjectData(oos);                
+            }
+            // Flush and close the output stream
+            oos.flush();
+        } catch (IOException e) {
+            log.error(sm.getString("deltaManager.unloading.ioe", e), e);
+            throw e;
+        } finally {
+            if (oos != null) {
+                try {
+                    oos.close();
+                } catch (IOException f) {
+                    ;
+                }
+                oos = null;
+            }
+        }
+        // send object data as byte[]
+        return fos.toByteArray();
+    }
+
+    // ------------------------------------------------------ Lifecycle Methods
+
+    /**
+     * Add a lifecycle event listener to this component.
+     * 
+     * @param listener
+     *            The listener to add
+     */
+    public void addLifecycleListener(LifecycleListener listener) {
+        lifecycle.addLifecycleListener(listener);
+    }
+
+    /**
+     * Get the lifecycle listeners associated with this lifecycle. If this
+     * Lifecycle has no listeners registered, a zero-length array is returned.
+     */
+    public LifecycleListener[] findLifecycleListeners() {
+        return lifecycle.findLifecycleListeners();
+    }
+
+    /**
+     * Remove a lifecycle event listener from this component.
+     * 
+     * @param listener
+     *            The listener to remove
+     */
+    public void removeLifecycleListener(LifecycleListener listener) {
+        lifecycle.removeLifecycleListener(listener);
+    }
+
+    /**
+     * Prepare for the beginning of active use of the public methods of this
+     * component. This method should be called after <code>configure()</code>,
+     * and before any of the public methods of the component are utilized.
+     * 
+     * @exception LifecycleException
+     *                if this component detects a fatal error that prevents this
+     *                component from being used
+     */
+    public void start() throws LifecycleException {
+        if (!initialized) init();
+
+        // Validate and update our current component state
+        if (started) {
+            return;
+        }
+        started = true;
+        lifecycle.fireLifecycleEvent(START_EVENT, null);
+
+        // Force initialization of the random number generator
+        generateSessionId();
+
+        // Load unloaded sessions, if any
+        try {
+            //the channel is already running
+            Cluster cluster = getCluster() ;
+            // stop remove cluster binding
+            //wow, how many nested levels of if statements can we have ;)
+            if(cluster == null) {
+                Container context = getContainer() ;
+                if(context != null && context instanceof Context) {
+                     Container host = context.getParent() ;
+                     if(host != null && host instanceof Host) {
+                         cluster = host.getCluster();
+                         if(cluster != null && cluster instanceof CatalinaCluster) {
+                             setCluster((CatalinaCluster) cluster) ;
+                         } else {
+                             Container engine = host.getParent() ;
+                             if(engine != null && engine instanceof Engine) {
+                                 cluster = engine.getCluster();
+                                 if(cluster != null && cluster instanceof CatalinaCluster) {
+                                     setCluster((CatalinaCluster) cluster) ;
+                                 }
+                             } else {
+                                     cluster = null ;
+                             }

[... 702 lines stripped ...]


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org