You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by pe...@apache.org on 2006/01/05 20:34:13 UTC

svn commit: r366255 - in /tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster: session/DeltaManager.java session/DeltaSession.java tcp/LocalStrings.properties tcp/ReplicationValve.java tcp/mbeans-descriptors.xml

Author: pero
Date: Thu Jan  5 11:34:04 2006
New Revision: 366255

URL: http://svn.apache.org/viewcvs?rev=366255&view=rev
Log:
Add support for cross context session replication

Modified:
    tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaManager.java
    tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaSession.java
    tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties
    tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationValve.java
    tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/mbeans-descriptors.xml

Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaManager.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaManager.java?rev=366255&r1=366254&r2=366255&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaManager.java (original)
+++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaManager.java Thu Jan  5 11:34:04 2006
@@ -34,6 +34,7 @@
 import org.apache.catalina.Context;
 import org.apache.catalina.Engine;
 import org.apache.catalina.Host;
+import org.apache.catalina.Valve;
 import org.apache.catalina.Lifecycle;
 import org.apache.catalina.LifecycleException;
 import org.apache.catalina.LifecycleListener;
@@ -44,9 +45,11 @@
 import org.apache.catalina.cluster.ClusterMessage;
 import org.apache.catalina.cluster.Member;
 import org.apache.catalina.session.ManagerBase;
+import org.apache.catalina.cluster.tcp.ReplicationValve;
 import org.apache.catalina.util.CustomObjectInputStream;
 import org.apache.catalina.util.LifecycleSupport;
 import org.apache.catalina.util.StringManager;
+import org.apache.catalina.core.StandardContext;
 
 /**
  * The DeltaManager manages replicated sessions by only replicating the deltas
@@ -86,7 +89,7 @@
     /**
      * The descriptive information about this implementation.
      */
-    private static final String info = "DeltaManager/2.0";
+    private static final String info = "DeltaManager/2.1";
 
     /**
      * Has this component been started yet?
@@ -105,6 +108,11 @@
     private CatalinaCluster cluster = null;
 
     /**
+     * cached replication valve cluster container!
+     */
+    private ReplicationValve replicationValve = null ;
+    
+    /**
      * The lifecycle event support for this component.
      */
     protected LifecycleSupport lifecycle = new LifecycleSupport(this);
@@ -120,7 +128,7 @@
 
     private boolean notifySessionListenersOnReplication = true;
 
-    private boolean stateTransferred = false ;
+    private boolean stateTransfered = false ;
 
     private int stateTransferTimeout = 60;
 
@@ -364,12 +372,20 @@
         this.stateTransferTimeout = timeoutAllSession;
     }
 
-    public boolean getStateTransferred() {
-        return stateTransferred;
+    /**
+     * is session state transfered complete?
+     * 
+     */
+    public boolean getStateTransfered() {
+        return stateTransfered;
     }
 
-    public void setStateTransferred(boolean stateTransferred) {
-        this.stateTransferred = stateTransferred;
+    /**
+     * set that state ist complete transfered  
+     * @param stateTransfered
+     */
+    public void setStateTransfered(boolean stateTransfered) {
+        this.stateTransfered = stateTransfered;
     }
     
     /**
@@ -439,6 +455,7 @@
     }
     
     /**
+     * 
      * @return Returns the sendAllSessions.
      */
     public boolean isSendAllSessions() {
@@ -917,7 +934,7 @@
         lifecycle.fireLifecycleEvent(START_EVENT, null);
 
         // Force initialization of the random number generator
-        String dummy = generateSessionId();
+        generateSessionId();
 
         // Load unloaded sessions, if any
         try {
@@ -995,7 +1012,7 @@
             stateTransferCreateSendTime = beforeSendTime ;
             // request session state
             counterSend_EVT_GET_ALL_SESSIONS++;
-            stateTransferred = false ;
+            stateTransfered = false ;
             // FIXME This send call block the deploy thread, when sender waitForAck is enabled
             try {
                 synchronized(receivedMessageQueue) {
@@ -1048,6 +1065,35 @@
     }
 
     /**
+     * Register cross context session at replication valve thread local
+     * @param session cross context session
+     */
+    protected void registerSessionAtReplicationValve(DeltaSession session) {
+	    	if(replicationValve == null) {
+	    		if(container instanceof StandardContext
+	    				&& ((StandardContext)container).getCrossContext()) {
+	    			Cluster cluster = getCluster() ;
+	    			if(cluster != null && cluster instanceof CatalinaCluster) {
+	    				Valve[] valves = ((CatalinaCluster)cluster).getValves();
+	    				if(valves != null && valves.length > 0) {
+	    					for(int i=0; replicationValve == null && i < valves.length ; i++ ){
+	    						if(valves[i] instanceof ReplicationValve)
+	    							replicationValve = (ReplicationValve)valves[i] ;
+	    					}
+	    					
+	    					if(replicationValve == null && log.isDebugEnabled()) {
+	    						log.debug("no ReplicationValve found for CrossContext Support");
+	    					}
+	    				}
+	    			}
+	    		}
+	    	}
+	    	if(replicationValve != null) {
+	    		replicationValve.registerReplicationSession(session);
+	    	}
+    	}
+    
+    /**
      * Find the master of the session state
      * @return master member of sessions 
      */
@@ -1062,7 +1108,6 @@
                     mbr = member ;
             }
         } else {
-            // FIXME Why only the first Member?
             if(mbrs.length != 0 )
                 mbr = mbrs[0];
         }
@@ -1092,7 +1137,7 @@
                 }
                 reqNow = System.currentTimeMillis();
                 isTimeout = ((reqNow - reqStart) > (1000 * getStateTransferTimeout()));
-            } while ((!getStateTransferred()) && (!isTimeout));
+            } while ((!getStateTransfered()) && (!isTimeout));
         } else {
             if(getStateTransferTimeout() == -1) {
                 // wait that state is transfered
@@ -1101,11 +1146,11 @@
                         Thread.sleep(100);
                     } catch (Exception sleep) {
                     }
-                } while ((!getStateTransferred()));
+                } while ((!getStateTransfered()));
                 reqNow = System.currentTimeMillis();
             }
         }
-        if (isTimeout || (!getStateTransferred())) {
+        if (isTimeout || (!getStateTransfered())) {
             counterNoStateTransfered++ ;
             log.error(sm.getString("deltaManager.noSessionState",
                     getName(),new Date(beforeSendTime),new Long(reqNow - beforeSendTime)));
@@ -1156,6 +1201,7 @@
         // Require a new random number generator if we are restarted
         this.random = null;
         getCluster().removeManager(getName(),this);
+        replicationValve = null;
         if (initialized) {
             destroy();
         }
@@ -1174,8 +1220,6 @@
         // Validate the source of this event
         if (!(event.getSource() instanceof Context))
             return;
-        Context context = (Context) event.getSource();
-
         // Process a relevant property change
         if (event.getPropertyName().equals("sessionTimeout")) {
             try {
@@ -1243,29 +1287,35 @@
             DeltaSession session = (DeltaSession) findSession(sessionId);
             DeltaRequest deltaRequest = session.getDeltaRequest();
             SessionMessage msg = null;
-            if (deltaRequest.getSize() > 0) {
-
-                counterSend_EVT_SESSION_DELTA++;
-                byte[] data = unloadDeltaRequest(deltaRequest);
-                msg = new SessionMessageImpl(name,
-                        SessionMessage.EVT_SESSION_DELTA, data, sessionId,
-                        sessionId + "-" + System.currentTimeMillis());
-                session.resetDeltaRequest();
-                if (log.isDebugEnabled()) {
-                    log.debug(sm.getString(
-                            "deltaManager.createMessage.delta",
-                            getName(), sessionId));
-                }
-                
-            } else if (!session.isPrimarySession()) {
-                counterSend_EVT_SESSION_ACCESSED++;
-                msg = new SessionMessageImpl(getName(),
-                        SessionMessage.EVT_SESSION_ACCESSED, null, sessionId,
-                        sessionId + "-" + System.currentTimeMillis());
+            boolean isDeltaRequest = false ;
+            synchronized(deltaRequest) {
+                isDeltaRequest = deltaRequest.getSize() > 0 ;
+                if (isDeltaRequest) {    
+                    counterSend_EVT_SESSION_DELTA++;
+                    byte[] data = unloadDeltaRequest(deltaRequest);
+                    msg = new SessionMessageImpl(getName(),
+                            SessionMessage.EVT_SESSION_DELTA, data, sessionId,
+                            sessionId + "-" + System.currentTimeMillis());
+                    session.resetDeltaRequest();
+                }  
+            }
+            if(!isDeltaRequest) {
+                if(!session.isPrimarySession()) {               
+                    counterSend_EVT_SESSION_ACCESSED++;
+                    msg = new SessionMessageImpl(getName(),
+                            SessionMessage.EVT_SESSION_ACCESSED, null, sessionId,
+                            sessionId + "-" + System.currentTimeMillis());
+                    if (log.isDebugEnabled()) {
+                        log.debug(sm.getString(
+                                "deltaManager.createMessage.accessChangePrimary",
+                                getName(), sessionId));
+                    }
+                }    
+            } else { // log only outside synch block!
                 if (log.isDebugEnabled()) {
                     log.debug(sm.getString(
-                            "deltaManager.createMessage.accessChangePrimary",
-                            getName(), sessionId));
+                        "deltaManager.createMessage.delta",
+                        getName(), sessionId));
                 }
             }
             session.setPrimarySession(true);
@@ -1493,7 +1543,7 @@
                     "deltaManager.receiveMessage.transfercomplete",
                     getName(), sender.getHost(), new Integer(sender.getPort())));
         stateTransferCreateSendTime = msg.getTimestamp() ;
-        stateTransferred = true ;
+        stateTransfered = true ;
     }
 
     /**

Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaSession.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaSession.java?rev=366255&r1=366254&r2=366255&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaSession.java (original)
+++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaSession.java Thu Jan  5 11:34:04 2006
@@ -42,12 +42,14 @@
 import javax.servlet.http.HttpSessionEvent;
 import javax.servlet.http.HttpSessionListener;
 
+import org.apache.catalina.Container;
 import org.apache.catalina.Context;
 import org.apache.catalina.Manager;
 import org.apache.catalina.Session;
 import org.apache.catalina.SessionEvent;
 import org.apache.catalina.SessionListener;
 import org.apache.catalina.cluster.ClusterSession;
+import org.apache.catalina.core.StandardContext;
 import org.apache.catalina.realm.GenericPrincipal;
 import org.apache.catalina.util.Enumerator;
 import org.apache.catalina.util.StringManager;
@@ -136,12 +138,6 @@
     private long creationTime = 0L;
 
     /**
-     * The debugging detail level for this component. NOTE: This value is not
-     * included in the serialized version of this object.
-     */
-    private transient int debug = 0;
-
-    /**
      * We are currently processing a session expiration, so bypass certain
      * IllegalStateException tests. NOTE: This value is not included in the
      * serialized version of this object.
@@ -162,7 +158,7 @@
     /**
      * Descriptive information describing this Session implementation.
      */
-    private static final String info = "DeltaSession/1.0";
+    private static final String info = "DeltaSession/1.1";
 
     /**
      * The last accessed time for this Session.
@@ -426,9 +422,7 @@
                     }
                 }
             }
-        }//end if
-        //end fix
-
+        }
     }
 
     /**
@@ -656,6 +650,8 @@
     public void endAccess() {
         isNew = false;
         accessCount--;
+        if(manager instanceof DeltaManager)
+            ((DeltaManager)manager).registerSessionAtReplicationValve(this);
     }
 
     /**

Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties?rev=366255&r1=366254&r2=366255&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties (original)
+++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties Thu Jan  5 11:34:04 2006
@@ -35,17 +35,22 @@
 ReplicationTransmitter.setProperty=set property {0}: {1} old value {2}
 ReplicationTransmitter.started=Start ClusterSender at cluster {0} with name {1}
 ReplicationTransmitter.stopped=Stopped ClusterSender at cluster {0} with name {1}
+ReplicationValve.crossContext.add=add Cross Context session replication container to replicationValve threadlocal
+ReplicationValve.crossContext.registerSession=register Cross context session id={0} from context {1}
+ReplicationValve.crossContext.remove=remove Cross Context session replication container from replicationValve threadlocal
+ReplicationValve.crossContext.sendDelta=send Cross Context session delta from context {0}.
 ReplicationValve.filter.loading=Loading request filters={0}
 ReplicationValve.filter.token=Request filter={0}
 ReplicationValve.filter.token.failure=Unable to compile filter={0}
 ReplicationValve.invoke.uri=Invoking replication request on {0}
 ReplicationValve.nocluster=No cluster configured for this request.
+ReplicationValve.resetDeltaRequest=Cluster is standalone: reset Session Request Delta at context {0}
 ReplicationValve.send.failure=Unable to perform replication request.
 ReplicationValve.send.invalid.failure=Unable to send session [id={0}] invalid message over cluster.
 ReplicationValve.session.found=Context {0}: Found session {1} but it isn't a ClusterSession.
 ReplicationValve.session.indicator=Context {0}: Primarity of session {0} in request attribute {1} is {2}.
 ReplicationValve.session.invalid=Context {0}: Requested session {1} is invalid, removed or not replicated at this node.
-ReplicationValve.stats=Average request time= {0} ms for Cluster overhead time={1} ms for {2} requests {3} filter requests (Request={4} ms Cluster={5} ms).
+ReplicationValve.stats=Average request time= {0} ms for Cluster overhead time={1} ms for {2} requests {3} filter requests {4} send requests {5} cross context requests (Request={6} ms Cluster={7} ms).
 SimpleTcpCluster.event.log=Cluster receive listener event {0} with data {1}
 SimpleTcpCluster.getProperty=get property {0}
 SimpleTcpCluster.setProperty=set property {0}: {1} old value {2}

Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationValve.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationValve.java?rev=366255&r1=366254&r2=366255&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationValve.java (original)
+++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationValve.java Thu Jan  5 11:34:04 2006
@@ -19,17 +19,22 @@
 import java.io.IOException;
 import java.util.StringTokenizer;
 import java.util.regex.Pattern;
-
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Iterator;
 import javax.servlet.ServletException;
 
 import org.apache.catalina.Manager;
 import org.apache.catalina.Session;
+import org.apache.catalina.Context;
+import org.apache.catalina.core.StandardContext;
 import org.apache.catalina.cluster.CatalinaCluster;
 import org.apache.catalina.cluster.ClusterManager;
 import org.apache.catalina.cluster.ClusterMessage;
 import org.apache.catalina.cluster.ClusterSession;
 import org.apache.catalina.cluster.ClusterValve;
 import org.apache.catalina.cluster.session.DeltaManager;
+import org.apache.catalina.cluster.session.DeltaSession;
 import org.apache.catalina.connector.Request;
 import org.apache.catalina.connector.Response;
 import org.apache.catalina.util.StringManager;
@@ -66,7 +71,7 @@
      * The descriptive information related to this implementation.
      */
     private static final String info =
-        "org.apache.catalina.cluster.tcp.ReplicationValve/1.2";
+        "org.apache.catalina.cluster.tcp.ReplicationValve/2.0";
 
 
     /**
@@ -81,20 +86,45 @@
      * holds file endings to not call for like images and others
      */
     protected java.util.regex.Pattern[] reqFilters = new java.util.regex.Pattern[0];
+    
+    /**
+     * Orginal filter 
+     */
     protected String filter ;
-
-    protected long totalRequestTime=0;
-    protected long totalSendTime=0;
-    protected long nrOfRequests =0;
-    protected long lastSendTime =0;
-    protected long nrOfFilterRequests=0;
+    
+    /**
+     * crossContext session container 
+     */
+    protected ThreadLocal crossContextSessions = new ThreadLocal() ;
+    
+    /**
+     * doProcessingStats (default = off)
+     */
+    protected boolean doProcessingStats = false;
+    
+    protected long totalRequestTime = 0;
+    protected long totalSendTime = 0;
+    protected long nrOfRequests = 0;
+    protected long lastSendTime = 0;
+    protected long nrOfFilterRequests = 0;
+    protected long nrOfSendRequests = 0;
+    protected long nrOfCrossContextSendRequests = 0;
+    
+    /**
+     * must primary change indicator set 
+     */
     protected boolean primaryIndicator = false ;
+    
+    /**
+     * Name of primary change indicator as request attributeĆ¢ 
+     */
     protected String primaryIndicatorName = "org.apache.catalina.cluster.tcp.isPrimarySession";
    
     // ------------------------------------------------------------- Properties
 
     public ReplicationValve() {
     }
+    
     /**
      * Return descriptive information about this Valve implementation.
      */
@@ -157,25 +187,43 @@
     public boolean isPrimaryIndicator() {
         return primaryIndicator;
     }
+
     /**
      * @param primaryIndicator The primaryIndicator to set.
      */
     public void setPrimaryIndicator(boolean primaryIndicator) {
         this.primaryIndicator = primaryIndicator;
     }
+    
     /**
      * @return Returns the primaryIndicatorName.
      */
     public String getPrimaryIndicatorName() {
         return primaryIndicatorName;
     }
+    
     /**
      * @param primaryIndicatorName The primaryIndicatorName to set.
      */
     public void setPrimaryIndicatorName(String primaryIndicatorName) {
         this.primaryIndicatorName = primaryIndicatorName;
     }
-      
+    
+    /**
+     * Calc processing stats
+     */
+    public boolean isDoProcessingStats() {
+        return doProcessingStats;
+    }
+
+    /**
+     * Set Calc processing stats
+     * @see #resetStatistics()
+     */
+    public void setDoProcessingStats(boolean doProcessingStats) {
+        this.doProcessingStats = doProcessingStats;
+    }
+
     /**
      * @return Returns the lastSendTime.
      */
@@ -198,6 +246,20 @@
     }
 
     /**
+     * @return Returns the nrOfCrossContextSendRequests.
+     */
+    public long getNrOfCrossContextSendRequests() {
+        return nrOfCrossContextSendRequests;
+    }
+
+    /**
+     * @return Returns the nrOfSendRequests.
+     */
+    public long getNrOfSendRequests() {
+        return nrOfSendRequests;
+    }
+
+    /**
      * @return Returns the totalRequestTime.
      */
     public long getTotalRequestTime() {
@@ -217,6 +279,7 @@
     protected java.util.regex.Pattern[] getReqFilters() {
         return reqFilters;
     }
+    
     /**
      * @param reqFilters The reqFilters to set.
      */
@@ -224,8 +287,28 @@
         this.reqFilters = reqFilters;
     }
     
+    
     // --------------------------------------------------------- Public Methods
     
+    /**
+     * Register all cross context sessions inside endAccess.
+     * Use a list with contains check, that the Portlet API can include a lot of fragments from same or
+     * different applications with session changes.
+     *
+     * @param session cross context session
+     */
+    public void registerReplicationSession(DeltaSession session) {
+        List sessions = (List)crossContextSessions.get();
+        if(sessions != null) {
+            if(!sessions.contains(session)) {
+                if(log.isDebugEnabled())
+                    log.debug(sm.getString("ReplicationValve.crossContext.registerSession",
+                        session.getIdInternal(),
+                        session.getManager().getContainer().getName()));
+                sessions.add(session);
+            }
+        }
+    }
 
     /**
      * Log the interesting request parameters, invoke the next Valve in the
@@ -240,45 +323,61 @@
     public void invoke(Request request, Response response)
         throws IOException, ServletException
     {
-        long totalstart = System.currentTimeMillis();
+        long totalstart = 0;
+
         //this happens before the request
-        if (primaryIndicator)
+        if(isDoProcessingStats()) {
+            totalstart = System.currentTimeMillis();
+        }
+        if (primaryIndicator) {
             createPrimaryIndicator(request) ;
-        getNext().invoke(request, response);
-        //this happens after the request
-        long start = System.currentTimeMillis();
-        Manager manager = request.getContext().getManager();
-        if (manager != null && manager instanceof ClusterManager) {
-            ClusterManager clusterManager = (ClusterManager) manager;
-            CatalinaCluster containerCluster = (CatalinaCluster) getContainer()
-                    .getCluster();
-            if (containerCluster == null) {
-                if (log.isWarnEnabled())
-                    log.warn(sm.getString("ReplicationValve.nocluster"));
-                return;
-            }
-            // valve cluster can access manager - other cluster handle replication 
-            // at host level - hopefully!
-            if(containerCluster.getManager(clusterManager.getName()) == null)
-                return ;
-            if(containerCluster.getMembers().length > 0  ) {
-                try {
-                    // send invalid sessions
-                    // DeltaManager returns String[0]
-                    if (!(clusterManager instanceof DeltaManager))
-                        sendInvalidSessions(clusterManager, containerCluster);
-                    // send replication
-                    sendSessionReplicationMessage(request, clusterManager, containerCluster);
-                } catch (Exception x) {
-                    log.error(sm.getString("ReplicationValve.send.failure"), x);
-                } finally {
-                    long stop = System.currentTimeMillis();
-                    updateStats(stop - totalstart, stop - start);
+        }
+        Context context = request.getContext();
+        boolean isCrossContext = context != null
+                && context instanceof StandardContext
+                && ((StandardContext) context).getCrossContext();
+        try {
+            if(isCrossContext) {
+                if(log.isDebugEnabled())
+                    log.debug(sm.getString("ReplicationValve.crossContext.add"));
+                //FIXME add Pool of Arraylists
+                crossContextSessions.set(new ArrayList());
+            }
+            getNext().invoke(request, response);
+            Manager manager = request.getContext().getManager();
+            if (manager != null && manager instanceof ClusterManager) {
+                ClusterManager clusterManager = (ClusterManager) manager;
+                CatalinaCluster containerCluster = (CatalinaCluster) getContainer()
+                        .getCluster();
+                if (containerCluster == null) {
+                    if (log.isWarnEnabled())
+                        log.warn(sm.getString("ReplicationValve.nocluster"));
+                    return;
                 }
+                // valve cluster can access manager - other cluster handle replication 
+                // at host level - hopefully!
+                if(containerCluster.getManager(clusterManager.getName()) == null)
+                    return ;
+                if(containerCluster.hasMembers()) {
+                    sendRepilicationMessage(request, totalstart, isCrossContext, clusterManager, containerCluster);
+                } else {
+                    resetReplicationRequest(request,isCrossContext);
+                }        
+            }
+        } finally {
+            // Array must be remove: Current master request send endAccess at recycle. 
+            // Don't register this request session again!
+            if(isCrossContext) {
+                if(log.isDebugEnabled())
+                    log.debug(sm.getString("ReplicationValve.crossContext.remove"));
+                // crossContextSessions.remove() only exist at Java 5
+                // register ArrayList at a pool
+                crossContextSessions.set(null);
             }
         }
     }
-  
+
+    
     /**
      * reset the active statitics 
      */
@@ -288,6 +387,8 @@
         lastSendTime = 0 ;
         nrOfFilterRequests = 0 ;
         nrOfRequests = 0 ;
+        nrOfSendRequests = 0;
+        nrOfCrossContextSendRequests = 0;
     }
     
     /**
@@ -306,12 +407,99 @@
     // --------------------------------------------------------- Protected Methods
 
     /**
-     * Send Cluster Replication Request
-     * @see DeltaManager#requestCompleted(String)
-     * @see SimpleTcpCluster#send(ClusterMessage)
      * @param request
-     * @param manager
-     * @param cluster
+     * @param totalstart
+     * @param isCrossContext
+     * @param clusterManager
+     * @param containerCluster
+     */
+    protected void sendRepilicationMessage(Request request, long totalstart, boolean isCrossContext, ClusterManager clusterManager, CatalinaCluster containerCluster) {
+        //this happens after the request
+        long start = 0;
+        if(isDoProcessingStats()) {
+            start = System.currentTimeMillis();
+        }
+        try {
+            // send invalid sessions
+            // DeltaManager returns String[0]
+            if (!(clusterManager instanceof DeltaManager))
+                sendInvalidSessions(clusterManager, containerCluster);
+            // send replication
+            sendSessionReplicationMessage(request, clusterManager, containerCluster);
+            if(isCrossContext)
+                sendCrossContextSession(containerCluster);
+        } catch (Exception x) {
+            // FIXME we have a lot of sends, but the trouble with one node stops the correct replication to other nodes!
+            log.error(sm.getString("ReplicationValve.send.failure"), x);
+        } finally {
+            // FIXME this stats update are not cheap!!
+            if(isDoProcessingStats()) {
+                updateStats(totalstart,start);
+            }
+        }
+    }
+
+    /**
+     * Send all changed cross context sessions to backups
+     * @param containerCluster
+     */
+    protected void sendCrossContextSession(CatalinaCluster containerCluster) {
+        Object sessions = crossContextSessions.get();
+        if(sessions != null && sessions instanceof List
+                && ((List)sessions).size() >0) {
+            for(Iterator iter = ((List)sessions).iterator(); iter.hasNext() ;) {          
+                Session session = (Session)iter.next();
+                if(log.isDebugEnabled())
+                    log.debug(sm.getString("ReplicationValve.crossContext.sendDelta",  
+                            session.getManager().getContainer().getName() ));
+                sendMessage(session,(ClusterManager)session.getManager(),containerCluster);
+                if(isDoProcessingStats()) {
+                    nrOfCrossContextSendRequests++;
+                }
+            }
+        }
+    }
+  
+    /**
+     * Fix memory leak for long sessions with many changes, when no backup member exists!
+     * @param request current request after responce is generated
+     * @param isCrossContext check crosscontext threadlocal
+     */
+    protected void resetReplicationRequest(Request request, boolean isCrossContext) {
+        Session contextSession = request.getSessionInternal(false);
+        if(contextSession != null & contextSession instanceof DeltaSession){
+            resetDeltaRequest(contextSession);
+        }
+        if(isCrossContext) {
+            Object sessions = crossContextSessions.get();
+            if(sessions != null && sessions instanceof List
+               && ((List)sessions).size() >0) {
+                Iterator iter = ((List)sessions).iterator();
+                for(; iter.hasNext() ;) {          
+                    Session session = (Session)iter.next();
+                    resetDeltaRequest(session);
+                }
+            }
+        }                     
+    }
+
+    /**
+     * Reset DeltaRequest from session
+     * @param session HttpSession from current request or cross context session
+     */
+    protected void resetDeltaRequest(Session session) {
+        if(log.isDebugEnabled()) {
+            log.debug(sm.getString("ReplicationValve.resetDeltaRequest" , 
+                session.getManager().getContainer().getName() ));
+        }
+        ((DeltaSession)session).resetDeltaRequest();
+    }
+
+    /**
+     * Send Cluster Replication Request
+     * @param request current request
+     * @param manager session manager
+     * @param cluster replication cluster
      */
     protected void sendSessionReplicationMessage(Request request,
             ClusterManager manager, CatalinaCluster cluster) {
@@ -320,26 +508,50 @@
             String uri = request.getDecodedRequestURI();
             // request without session change
             if (!isRequestWithoutSessionChange(uri)) {
-
                 if (log.isDebugEnabled())
                     log.debug(sm.getString("ReplicationValve.invoke.uri", uri));
-                String id = session.getIdInternal();
-                if (id != null) {
-                    ClusterMessage msg = manager.requestCompleted(id);
-                    // really send replication send request
-                    // FIXME send directly via ClusterManager.send
-                    if (msg != null) {
-                        if(manager.isSendClusterDomainOnly())
-                            cluster.sendClusterDomain(msg);
-                        else
-                            cluster.send(msg);
-                    }
-                }
+                sendMessage(session,manager,cluster);
             } else
-                nrOfFilterRequests++;
+                if(isDoProcessingStats())
+                    nrOfFilterRequests++;
         }
 
     }
+
+   /**
+    * Send message delta message from request session 
+    * @param request current request
+    * @param manager session manager
+    * @param cluster replication cluster
+    */
+    protected void sendMessage(Session session,
+             ClusterManager manager, CatalinaCluster cluster) {
+        String id = session.getIdInternal();
+        if (id != null) {
+            send(manager, cluster, id);
+        }
+    }
+
+    /**
+     * send manager requestCompleted message to cluster
+     * @param manager SessionManager
+     * @param cluster replication cluster
+     * @param sessionId sessionid from the manager
+     * @see DeltaManager#requestCompleted(String)
+     * @see SimpleTcpCluster#send(ClusterMessage)
+     */
+    protected void send(ClusterManager manager, CatalinaCluster cluster, String sessionId) {
+        ClusterMessage msg = manager.requestCompleted(sessionId);
+        if (msg != null) {
+            if(manager.isSendClusterDomainOnly()) {
+                cluster.sendClusterDomain(msg);
+            } else {
+                cluster.send(msg);
+            }
+            if(isDoProcessingStats())
+                nrOfSendRequests++;
+        }
+    }
     
     /**
      * check for session invalidations
@@ -351,14 +563,7 @@
         if ( invalidIds.length > 0 ) {
             for ( int i=0;i<invalidIds.length; i++ ) {
                 try {
-                    ClusterMessage imsg = manager.requestCompleted(invalidIds[i]);
-                    // FIXME send directly via ClusterManager.send
-                    if (imsg != null) {
-                        if(manager.isSendClusterDomainOnly())
-                            cluster.sendClusterDomain(imsg);
-                        else
-                            cluster.send(imsg);
-                    }
+                    send(manager,cluster,invalidIds[i]);
                 } catch ( Exception x ) {
                     log.error(sm.getString("ReplicationValve.send.invalid.failure",invalidIds[i]),x);
                 }
@@ -387,23 +592,27 @@
      * @param requestTime
      * @param clusterTime
      */
-    protected synchronized void updateStats(long requestTime, long clusterTime) {
-        totalSendTime+=clusterTime;
-        totalRequestTime+=requestTime;
-        nrOfRequests++;
-        if ( (nrOfRequests % 100) == 0 ) {
-            if(log.isInfoEnabled()) {
+    protected  void updateStats(long requestTime, long clusterTime) {
+        synchronized(this) {
+            lastSendTime=System.currentTimeMillis();
+            totalSendTime+=lastSendTime - clusterTime;
+            totalRequestTime+=lastSendTime - requestTime;
+            nrOfRequests++;
+        }
+        if(log.isInfoEnabled()) {
+            if ( (nrOfRequests % 100) == 0 ) {
                  log.info(sm.getString("ReplicationValve.stats",
                      new Object[]{
                          new Long(totalRequestTime/nrOfRequests),
                          new Long(totalSendTime/nrOfRequests),
                          new Long(nrOfRequests),
+                         new Long(nrOfSendRequests),
+                         new Long(nrOfCrossContextSendRequests),
                          new Long(nrOfFilterRequests),
                          new Long(totalRequestTime),
                          new Long(totalSendTime)}));
              }
         }
-        lastSendTime=System.currentTimeMillis();
     }
 
 
@@ -442,7 +651,5 @@
             }
         }
     }
-
-
 
 }

Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/mbeans-descriptors.xml
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/mbeans-descriptors.xml?rev=366255&r1=366254&r2=366255&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/mbeans-descriptors.xml (original)
+++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/mbeans-descriptors.xml Thu Jan  5 11:34:04 2006
@@ -988,6 +988,10 @@
                domain="Catalina"
                 group="Valve"
                  type="org.apache.catalina.cluster.tcp.ReplicationValve">
+    <attribute   name="info"
+          description="Class version info"
+                 type="java.lang.String"
+                 writeable="false"/>
     <attribute   name="filter"
           description="resource filter to disable session replication check"
                  type="java.lang.String"/>
@@ -998,12 +1002,24 @@
     <attribute   name="primaryIndicatorName"
           description="Request attribute name to indicate that request processing is at primary session node"
                  type="java.lang.String"/>
+    <attribute   name="doProcessingStats"
+ 			     is="true"
+          description="active statistics counting"
+                 type="boolean"/>
 	<attribute   name="nrOfRequests"
           description="number of replicated requests"
                  type="long"
                  writeable="false"/>
 	<attribute   name="nrOfFilterRequests"
           description="number of filtered requests"
+                 type="long"
+                 writeable="false"/>
+	<attribute   name="nrOfSendRequests"
+          description="number of send requests"
+                 type="long"
+                 writeable="false"/>
+	<attribute   name="nrOfCrossContextSendRequests"
+          description="number of send cross context session requests"
                  type="long"
                  writeable="false"/>
     <attribute   name="totalRequestTime"



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org