You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by pe...@apache.org on 2006/01/05 20:34:13 UTC
svn commit: r366255 - in
/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster:
session/DeltaManager.java session/DeltaSession.java
tcp/LocalStrings.properties tcp/ReplicationValve.java
tcp/mbeans-descriptors.xml
Author: pero
Date: Thu Jan 5 11:34:04 2006
New Revision: 366255
URL: http://svn.apache.org/viewcvs?rev=366255&view=rev
Log:
Add support for cross context session replication
Modified:
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaManager.java
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaSession.java
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationValve.java
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/mbeans-descriptors.xml
Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaManager.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaManager.java?rev=366255&r1=366254&r2=366255&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaManager.java (original)
+++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaManager.java Thu Jan 5 11:34:04 2006
@@ -34,6 +34,7 @@
import org.apache.catalina.Context;
import org.apache.catalina.Engine;
import org.apache.catalina.Host;
+import org.apache.catalina.Valve;
import org.apache.catalina.Lifecycle;
import org.apache.catalina.LifecycleException;
import org.apache.catalina.LifecycleListener;
@@ -44,9 +45,11 @@
import org.apache.catalina.cluster.ClusterMessage;
import org.apache.catalina.cluster.Member;
import org.apache.catalina.session.ManagerBase;
+import org.apache.catalina.cluster.tcp.ReplicationValve;
import org.apache.catalina.util.CustomObjectInputStream;
import org.apache.catalina.util.LifecycleSupport;
import org.apache.catalina.util.StringManager;
+import org.apache.catalina.core.StandardContext;
/**
* The DeltaManager manages replicated sessions by only replicating the deltas
@@ -86,7 +89,7 @@
/**
* The descriptive information about this implementation.
*/
- private static final String info = "DeltaManager/2.0";
+ private static final String info = "DeltaManager/2.1";
/**
* Has this component been started yet?
@@ -105,6 +108,11 @@
private CatalinaCluster cluster = null;
/**
+ * cached replication valve cluster container!
+ */
+ private ReplicationValve replicationValve = null ;
+
+ /**
* The lifecycle event support for this component.
*/
protected LifecycleSupport lifecycle = new LifecycleSupport(this);
@@ -120,7 +128,7 @@
private boolean notifySessionListenersOnReplication = true;
- private boolean stateTransferred = false ;
+ private boolean stateTransfered = false ;
private int stateTransferTimeout = 60;
@@ -364,12 +372,20 @@
this.stateTransferTimeout = timeoutAllSession;
}
- public boolean getStateTransferred() {
- return stateTransferred;
+ /**
+ * is session state transfered complete?
+ *
+ */
+ public boolean getStateTransfered() {
+ return stateTransfered;
}
- public void setStateTransferred(boolean stateTransferred) {
- this.stateTransferred = stateTransferred;
+ /**
+ * set that state ist complete transfered
+ * @param stateTransfered
+ */
+ public void setStateTransfered(boolean stateTransfered) {
+ this.stateTransfered = stateTransfered;
}
/**
@@ -439,6 +455,7 @@
}
/**
+ *
* @return Returns the sendAllSessions.
*/
public boolean isSendAllSessions() {
@@ -917,7 +934,7 @@
lifecycle.fireLifecycleEvent(START_EVENT, null);
// Force initialization of the random number generator
- String dummy = generateSessionId();
+ generateSessionId();
// Load unloaded sessions, if any
try {
@@ -995,7 +1012,7 @@
stateTransferCreateSendTime = beforeSendTime ;
// request session state
counterSend_EVT_GET_ALL_SESSIONS++;
- stateTransferred = false ;
+ stateTransfered = false ;
// FIXME This send call block the deploy thread, when sender waitForAck is enabled
try {
synchronized(receivedMessageQueue) {
@@ -1048,6 +1065,35 @@
}
/**
+ * Register cross context session at replication valve thread local
+ * @param session cross context session
+ */
+ protected void registerSessionAtReplicationValve(DeltaSession session) {
+ if(replicationValve == null) {
+ if(container instanceof StandardContext
+ && ((StandardContext)container).getCrossContext()) {
+ Cluster cluster = getCluster() ;
+ if(cluster != null && cluster instanceof CatalinaCluster) {
+ Valve[] valves = ((CatalinaCluster)cluster).getValves();
+ if(valves != null && valves.length > 0) {
+ for(int i=0; replicationValve == null && i < valves.length ; i++ ){
+ if(valves[i] instanceof ReplicationValve)
+ replicationValve = (ReplicationValve)valves[i] ;
+ }
+
+ if(replicationValve == null && log.isDebugEnabled()) {
+ log.debug("no ReplicationValve found for CrossContext Support");
+ }
+ }
+ }
+ }
+ }
+ if(replicationValve != null) {
+ replicationValve.registerReplicationSession(session);
+ }
+ }
+
+ /**
* Find the master of the session state
* @return master member of sessions
*/
@@ -1062,7 +1108,6 @@
mbr = member ;
}
} else {
- // FIXME Why only the first Member?
if(mbrs.length != 0 )
mbr = mbrs[0];
}
@@ -1092,7 +1137,7 @@
}
reqNow = System.currentTimeMillis();
isTimeout = ((reqNow - reqStart) > (1000 * getStateTransferTimeout()));
- } while ((!getStateTransferred()) && (!isTimeout));
+ } while ((!getStateTransfered()) && (!isTimeout));
} else {
if(getStateTransferTimeout() == -1) {
// wait that state is transfered
@@ -1101,11 +1146,11 @@
Thread.sleep(100);
} catch (Exception sleep) {
}
- } while ((!getStateTransferred()));
+ } while ((!getStateTransfered()));
reqNow = System.currentTimeMillis();
}
}
- if (isTimeout || (!getStateTransferred())) {
+ if (isTimeout || (!getStateTransfered())) {
counterNoStateTransfered++ ;
log.error(sm.getString("deltaManager.noSessionState",
getName(),new Date(beforeSendTime),new Long(reqNow - beforeSendTime)));
@@ -1156,6 +1201,7 @@
// Require a new random number generator if we are restarted
this.random = null;
getCluster().removeManager(getName(),this);
+ replicationValve = null;
if (initialized) {
destroy();
}
@@ -1174,8 +1220,6 @@
// Validate the source of this event
if (!(event.getSource() instanceof Context))
return;
- Context context = (Context) event.getSource();
-
// Process a relevant property change
if (event.getPropertyName().equals("sessionTimeout")) {
try {
@@ -1243,29 +1287,35 @@
DeltaSession session = (DeltaSession) findSession(sessionId);
DeltaRequest deltaRequest = session.getDeltaRequest();
SessionMessage msg = null;
- if (deltaRequest.getSize() > 0) {
-
- counterSend_EVT_SESSION_DELTA++;
- byte[] data = unloadDeltaRequest(deltaRequest);
- msg = new SessionMessageImpl(name,
- SessionMessage.EVT_SESSION_DELTA, data, sessionId,
- sessionId + "-" + System.currentTimeMillis());
- session.resetDeltaRequest();
- if (log.isDebugEnabled()) {
- log.debug(sm.getString(
- "deltaManager.createMessage.delta",
- getName(), sessionId));
- }
-
- } else if (!session.isPrimarySession()) {
- counterSend_EVT_SESSION_ACCESSED++;
- msg = new SessionMessageImpl(getName(),
- SessionMessage.EVT_SESSION_ACCESSED, null, sessionId,
- sessionId + "-" + System.currentTimeMillis());
+ boolean isDeltaRequest = false ;
+ synchronized(deltaRequest) {
+ isDeltaRequest = deltaRequest.getSize() > 0 ;
+ if (isDeltaRequest) {
+ counterSend_EVT_SESSION_DELTA++;
+ byte[] data = unloadDeltaRequest(deltaRequest);
+ msg = new SessionMessageImpl(getName(),
+ SessionMessage.EVT_SESSION_DELTA, data, sessionId,
+ sessionId + "-" + System.currentTimeMillis());
+ session.resetDeltaRequest();
+ }
+ }
+ if(!isDeltaRequest) {
+ if(!session.isPrimarySession()) {
+ counterSend_EVT_SESSION_ACCESSED++;
+ msg = new SessionMessageImpl(getName(),
+ SessionMessage.EVT_SESSION_ACCESSED, null, sessionId,
+ sessionId + "-" + System.currentTimeMillis());
+ if (log.isDebugEnabled()) {
+ log.debug(sm.getString(
+ "deltaManager.createMessage.accessChangePrimary",
+ getName(), sessionId));
+ }
+ }
+ } else { // log only outside synch block!
if (log.isDebugEnabled()) {
log.debug(sm.getString(
- "deltaManager.createMessage.accessChangePrimary",
- getName(), sessionId));
+ "deltaManager.createMessage.delta",
+ getName(), sessionId));
}
}
session.setPrimarySession(true);
@@ -1493,7 +1543,7 @@
"deltaManager.receiveMessage.transfercomplete",
getName(), sender.getHost(), new Integer(sender.getPort())));
stateTransferCreateSendTime = msg.getTimestamp() ;
- stateTransferred = true ;
+ stateTransfered = true ;
}
/**
Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaSession.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaSession.java?rev=366255&r1=366254&r2=366255&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaSession.java (original)
+++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaSession.java Thu Jan 5 11:34:04 2006
@@ -42,12 +42,14 @@
import javax.servlet.http.HttpSessionEvent;
import javax.servlet.http.HttpSessionListener;
+import org.apache.catalina.Container;
import org.apache.catalina.Context;
import org.apache.catalina.Manager;
import org.apache.catalina.Session;
import org.apache.catalina.SessionEvent;
import org.apache.catalina.SessionListener;
import org.apache.catalina.cluster.ClusterSession;
+import org.apache.catalina.core.StandardContext;
import org.apache.catalina.realm.GenericPrincipal;
import org.apache.catalina.util.Enumerator;
import org.apache.catalina.util.StringManager;
@@ -136,12 +138,6 @@
private long creationTime = 0L;
/**
- * The debugging detail level for this component. NOTE: This value is not
- * included in the serialized version of this object.
- */
- private transient int debug = 0;
-
- /**
* We are currently processing a session expiration, so bypass certain
* IllegalStateException tests. NOTE: This value is not included in the
* serialized version of this object.
@@ -162,7 +158,7 @@
/**
* Descriptive information describing this Session implementation.
*/
- private static final String info = "DeltaSession/1.0";
+ private static final String info = "DeltaSession/1.1";
/**
* The last accessed time for this Session.
@@ -426,9 +422,7 @@
}
}
}
- }//end if
- //end fix
-
+ }
}
/**
@@ -656,6 +650,8 @@
public void endAccess() {
isNew = false;
accessCount--;
+ if(manager instanceof DeltaManager)
+ ((DeltaManager)manager).registerSessionAtReplicationValve(this);
}
/**
Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties?rev=366255&r1=366254&r2=366255&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties (original)
+++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties Thu Jan 5 11:34:04 2006
@@ -35,17 +35,22 @@
ReplicationTransmitter.setProperty=set property {0}: {1} old value {2}
ReplicationTransmitter.started=Start ClusterSender at cluster {0} with name {1}
ReplicationTransmitter.stopped=Stopped ClusterSender at cluster {0} with name {1}
+ReplicationValve.crossContext.add=add Cross Context session replication container to replicationValve threadlocal
+ReplicationValve.crossContext.registerSession=register Cross context session id={0} from context {1}
+ReplicationValve.crossContext.remove=remove Cross Context session replication container from replicationValve threadlocal
+ReplicationValve.crossContext.sendDelta=send Cross Context session delta from context {0}.
ReplicationValve.filter.loading=Loading request filters={0}
ReplicationValve.filter.token=Request filter={0}
ReplicationValve.filter.token.failure=Unable to compile filter={0}
ReplicationValve.invoke.uri=Invoking replication request on {0}
ReplicationValve.nocluster=No cluster configured for this request.
+ReplicationValve.resetDeltaRequest=Cluster is standalone: reset Session Request Delta at context {0}
ReplicationValve.send.failure=Unable to perform replication request.
ReplicationValve.send.invalid.failure=Unable to send session [id={0}] invalid message over cluster.
ReplicationValve.session.found=Context {0}: Found session {1} but it isn't a ClusterSession.
ReplicationValve.session.indicator=Context {0}: Primarity of session {0} in request attribute {1} is {2}.
ReplicationValve.session.invalid=Context {0}: Requested session {1} is invalid, removed or not replicated at this node.
-ReplicationValve.stats=Average request time= {0} ms for Cluster overhead time={1} ms for {2} requests {3} filter requests (Request={4} ms Cluster={5} ms).
+ReplicationValve.stats=Average request time= {0} ms for Cluster overhead time={1} ms for {2} requests {3} filter requests {4} send requests {5} cross context requests (Request={6} ms Cluster={7} ms).
SimpleTcpCluster.event.log=Cluster receive listener event {0} with data {1}
SimpleTcpCluster.getProperty=get property {0}
SimpleTcpCluster.setProperty=set property {0}: {1} old value {2}
Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationValve.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationValve.java?rev=366255&r1=366254&r2=366255&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationValve.java (original)
+++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationValve.java Thu Jan 5 11:34:04 2006
@@ -19,17 +19,22 @@
import java.io.IOException;
import java.util.StringTokenizer;
import java.util.regex.Pattern;
-
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Iterator;
import javax.servlet.ServletException;
import org.apache.catalina.Manager;
import org.apache.catalina.Session;
+import org.apache.catalina.Context;
+import org.apache.catalina.core.StandardContext;
import org.apache.catalina.cluster.CatalinaCluster;
import org.apache.catalina.cluster.ClusterManager;
import org.apache.catalina.cluster.ClusterMessage;
import org.apache.catalina.cluster.ClusterSession;
import org.apache.catalina.cluster.ClusterValve;
import org.apache.catalina.cluster.session.DeltaManager;
+import org.apache.catalina.cluster.session.DeltaSession;
import org.apache.catalina.connector.Request;
import org.apache.catalina.connector.Response;
import org.apache.catalina.util.StringManager;
@@ -66,7 +71,7 @@
* The descriptive information related to this implementation.
*/
private static final String info =
- "org.apache.catalina.cluster.tcp.ReplicationValve/1.2";
+ "org.apache.catalina.cluster.tcp.ReplicationValve/2.0";
/**
@@ -81,20 +86,45 @@
* holds file endings to not call for like images and others
*/
protected java.util.regex.Pattern[] reqFilters = new java.util.regex.Pattern[0];
+
+ /**
+ * Orginal filter
+ */
protected String filter ;
-
- protected long totalRequestTime=0;
- protected long totalSendTime=0;
- protected long nrOfRequests =0;
- protected long lastSendTime =0;
- protected long nrOfFilterRequests=0;
+
+ /**
+ * crossContext session container
+ */
+ protected ThreadLocal crossContextSessions = new ThreadLocal() ;
+
+ /**
+ * doProcessingStats (default = off)
+ */
+ protected boolean doProcessingStats = false;
+
+ protected long totalRequestTime = 0;
+ protected long totalSendTime = 0;
+ protected long nrOfRequests = 0;
+ protected long lastSendTime = 0;
+ protected long nrOfFilterRequests = 0;
+ protected long nrOfSendRequests = 0;
+ protected long nrOfCrossContextSendRequests = 0;
+
+ /**
+ * must primary change indicator set
+ */
protected boolean primaryIndicator = false ;
+
+ /**
+ * Name of primary change indicator as request attributeĆ¢
+ */
protected String primaryIndicatorName = "org.apache.catalina.cluster.tcp.isPrimarySession";
// ------------------------------------------------------------- Properties
public ReplicationValve() {
}
+
/**
* Return descriptive information about this Valve implementation.
*/
@@ -157,25 +187,43 @@
public boolean isPrimaryIndicator() {
return primaryIndicator;
}
+
/**
* @param primaryIndicator The primaryIndicator to set.
*/
public void setPrimaryIndicator(boolean primaryIndicator) {
this.primaryIndicator = primaryIndicator;
}
+
/**
* @return Returns the primaryIndicatorName.
*/
public String getPrimaryIndicatorName() {
return primaryIndicatorName;
}
+
/**
* @param primaryIndicatorName The primaryIndicatorName to set.
*/
public void setPrimaryIndicatorName(String primaryIndicatorName) {
this.primaryIndicatorName = primaryIndicatorName;
}
-
+
+ /**
+ * Calc processing stats
+ */
+ public boolean isDoProcessingStats() {
+ return doProcessingStats;
+ }
+
+ /**
+ * Set Calc processing stats
+ * @see #resetStatistics()
+ */
+ public void setDoProcessingStats(boolean doProcessingStats) {
+ this.doProcessingStats = doProcessingStats;
+ }
+
/**
* @return Returns the lastSendTime.
*/
@@ -198,6 +246,20 @@
}
/**
+ * @return Returns the nrOfCrossContextSendRequests.
+ */
+ public long getNrOfCrossContextSendRequests() {
+ return nrOfCrossContextSendRequests;
+ }
+
+ /**
+ * @return Returns the nrOfSendRequests.
+ */
+ public long getNrOfSendRequests() {
+ return nrOfSendRequests;
+ }
+
+ /**
* @return Returns the totalRequestTime.
*/
public long getTotalRequestTime() {
@@ -217,6 +279,7 @@
protected java.util.regex.Pattern[] getReqFilters() {
return reqFilters;
}
+
/**
* @param reqFilters The reqFilters to set.
*/
@@ -224,8 +287,28 @@
this.reqFilters = reqFilters;
}
+
// --------------------------------------------------------- Public Methods
+ /**
+ * Register all cross context sessions inside endAccess.
+ * Use a list with contains check, that the Portlet API can include a lot of fragments from same or
+ * different applications with session changes.
+ *
+ * @param session cross context session
+ */
+ public void registerReplicationSession(DeltaSession session) {
+ List sessions = (List)crossContextSessions.get();
+ if(sessions != null) {
+ if(!sessions.contains(session)) {
+ if(log.isDebugEnabled())
+ log.debug(sm.getString("ReplicationValve.crossContext.registerSession",
+ session.getIdInternal(),
+ session.getManager().getContainer().getName()));
+ sessions.add(session);
+ }
+ }
+ }
/**
* Log the interesting request parameters, invoke the next Valve in the
@@ -240,45 +323,61 @@
public void invoke(Request request, Response response)
throws IOException, ServletException
{
- long totalstart = System.currentTimeMillis();
+ long totalstart = 0;
+
//this happens before the request
- if (primaryIndicator)
+ if(isDoProcessingStats()) {
+ totalstart = System.currentTimeMillis();
+ }
+ if (primaryIndicator) {
createPrimaryIndicator(request) ;
- getNext().invoke(request, response);
- //this happens after the request
- long start = System.currentTimeMillis();
- Manager manager = request.getContext().getManager();
- if (manager != null && manager instanceof ClusterManager) {
- ClusterManager clusterManager = (ClusterManager) manager;
- CatalinaCluster containerCluster = (CatalinaCluster) getContainer()
- .getCluster();
- if (containerCluster == null) {
- if (log.isWarnEnabled())
- log.warn(sm.getString("ReplicationValve.nocluster"));
- return;
- }
- // valve cluster can access manager - other cluster handle replication
- // at host level - hopefully!
- if(containerCluster.getManager(clusterManager.getName()) == null)
- return ;
- if(containerCluster.getMembers().length > 0 ) {
- try {
- // send invalid sessions
- // DeltaManager returns String[0]
- if (!(clusterManager instanceof DeltaManager))
- sendInvalidSessions(clusterManager, containerCluster);
- // send replication
- sendSessionReplicationMessage(request, clusterManager, containerCluster);
- } catch (Exception x) {
- log.error(sm.getString("ReplicationValve.send.failure"), x);
- } finally {
- long stop = System.currentTimeMillis();
- updateStats(stop - totalstart, stop - start);
+ }
+ Context context = request.getContext();
+ boolean isCrossContext = context != null
+ && context instanceof StandardContext
+ && ((StandardContext) context).getCrossContext();
+ try {
+ if(isCrossContext) {
+ if(log.isDebugEnabled())
+ log.debug(sm.getString("ReplicationValve.crossContext.add"));
+ //FIXME add Pool of Arraylists
+ crossContextSessions.set(new ArrayList());
+ }
+ getNext().invoke(request, response);
+ Manager manager = request.getContext().getManager();
+ if (manager != null && manager instanceof ClusterManager) {
+ ClusterManager clusterManager = (ClusterManager) manager;
+ CatalinaCluster containerCluster = (CatalinaCluster) getContainer()
+ .getCluster();
+ if (containerCluster == null) {
+ if (log.isWarnEnabled())
+ log.warn(sm.getString("ReplicationValve.nocluster"));
+ return;
}
+ // valve cluster can access manager - other cluster handle replication
+ // at host level - hopefully!
+ if(containerCluster.getManager(clusterManager.getName()) == null)
+ return ;
+ if(containerCluster.hasMembers()) {
+ sendRepilicationMessage(request, totalstart, isCrossContext, clusterManager, containerCluster);
+ } else {
+ resetReplicationRequest(request,isCrossContext);
+ }
+ }
+ } finally {
+ // Array must be remove: Current master request send endAccess at recycle.
+ // Don't register this request session again!
+ if(isCrossContext) {
+ if(log.isDebugEnabled())
+ log.debug(sm.getString("ReplicationValve.crossContext.remove"));
+ // crossContextSessions.remove() only exist at Java 5
+ // register ArrayList at a pool
+ crossContextSessions.set(null);
}
}
}
-
+
+
/**
* reset the active statitics
*/
@@ -288,6 +387,8 @@
lastSendTime = 0 ;
nrOfFilterRequests = 0 ;
nrOfRequests = 0 ;
+ nrOfSendRequests = 0;
+ nrOfCrossContextSendRequests = 0;
}
/**
@@ -306,12 +407,99 @@
// --------------------------------------------------------- Protected Methods
/**
- * Send Cluster Replication Request
- * @see DeltaManager#requestCompleted(String)
- * @see SimpleTcpCluster#send(ClusterMessage)
* @param request
- * @param manager
- * @param cluster
+ * @param totalstart
+ * @param isCrossContext
+ * @param clusterManager
+ * @param containerCluster
+ */
+ protected void sendRepilicationMessage(Request request, long totalstart, boolean isCrossContext, ClusterManager clusterManager, CatalinaCluster containerCluster) {
+ //this happens after the request
+ long start = 0;
+ if(isDoProcessingStats()) {
+ start = System.currentTimeMillis();
+ }
+ try {
+ // send invalid sessions
+ // DeltaManager returns String[0]
+ if (!(clusterManager instanceof DeltaManager))
+ sendInvalidSessions(clusterManager, containerCluster);
+ // send replication
+ sendSessionReplicationMessage(request, clusterManager, containerCluster);
+ if(isCrossContext)
+ sendCrossContextSession(containerCluster);
+ } catch (Exception x) {
+ // FIXME we have a lot of sends, but the trouble with one node stops the correct replication to other nodes!
+ log.error(sm.getString("ReplicationValve.send.failure"), x);
+ } finally {
+ // FIXME this stats update are not cheap!!
+ if(isDoProcessingStats()) {
+ updateStats(totalstart,start);
+ }
+ }
+ }
+
+ /**
+ * Send all changed cross context sessions to backups
+ * @param containerCluster
+ */
+ protected void sendCrossContextSession(CatalinaCluster containerCluster) {
+ Object sessions = crossContextSessions.get();
+ if(sessions != null && sessions instanceof List
+ && ((List)sessions).size() >0) {
+ for(Iterator iter = ((List)sessions).iterator(); iter.hasNext() ;) {
+ Session session = (Session)iter.next();
+ if(log.isDebugEnabled())
+ log.debug(sm.getString("ReplicationValve.crossContext.sendDelta",
+ session.getManager().getContainer().getName() ));
+ sendMessage(session,(ClusterManager)session.getManager(),containerCluster);
+ if(isDoProcessingStats()) {
+ nrOfCrossContextSendRequests++;
+ }
+ }
+ }
+ }
+
+ /**
+ * Fix memory leak for long sessions with many changes, when no backup member exists!
+ * @param request current request after responce is generated
+ * @param isCrossContext check crosscontext threadlocal
+ */
+ protected void resetReplicationRequest(Request request, boolean isCrossContext) {
+ Session contextSession = request.getSessionInternal(false);
+ if(contextSession != null & contextSession instanceof DeltaSession){
+ resetDeltaRequest(contextSession);
+ }
+ if(isCrossContext) {
+ Object sessions = crossContextSessions.get();
+ if(sessions != null && sessions instanceof List
+ && ((List)sessions).size() >0) {
+ Iterator iter = ((List)sessions).iterator();
+ for(; iter.hasNext() ;) {
+ Session session = (Session)iter.next();
+ resetDeltaRequest(session);
+ }
+ }
+ }
+ }
+
+ /**
+ * Reset DeltaRequest from session
+ * @param session HttpSession from current request or cross context session
+ */
+ protected void resetDeltaRequest(Session session) {
+ if(log.isDebugEnabled()) {
+ log.debug(sm.getString("ReplicationValve.resetDeltaRequest" ,
+ session.getManager().getContainer().getName() ));
+ }
+ ((DeltaSession)session).resetDeltaRequest();
+ }
+
+ /**
+ * Send Cluster Replication Request
+ * @param request current request
+ * @param manager session manager
+ * @param cluster replication cluster
*/
protected void sendSessionReplicationMessage(Request request,
ClusterManager manager, CatalinaCluster cluster) {
@@ -320,26 +508,50 @@
String uri = request.getDecodedRequestURI();
// request without session change
if (!isRequestWithoutSessionChange(uri)) {
-
if (log.isDebugEnabled())
log.debug(sm.getString("ReplicationValve.invoke.uri", uri));
- String id = session.getIdInternal();
- if (id != null) {
- ClusterMessage msg = manager.requestCompleted(id);
- // really send replication send request
- // FIXME send directly via ClusterManager.send
- if (msg != null) {
- if(manager.isSendClusterDomainOnly())
- cluster.sendClusterDomain(msg);
- else
- cluster.send(msg);
- }
- }
+ sendMessage(session,manager,cluster);
} else
- nrOfFilterRequests++;
+ if(isDoProcessingStats())
+ nrOfFilterRequests++;
}
}
+
+ /**
+ * Send message delta message from request session
+ * @param request current request
+ * @param manager session manager
+ * @param cluster replication cluster
+ */
+ protected void sendMessage(Session session,
+ ClusterManager manager, CatalinaCluster cluster) {
+ String id = session.getIdInternal();
+ if (id != null) {
+ send(manager, cluster, id);
+ }
+ }
+
+ /**
+ * send manager requestCompleted message to cluster
+ * @param manager SessionManager
+ * @param cluster replication cluster
+ * @param sessionId sessionid from the manager
+ * @see DeltaManager#requestCompleted(String)
+ * @see SimpleTcpCluster#send(ClusterMessage)
+ */
+ protected void send(ClusterManager manager, CatalinaCluster cluster, String sessionId) {
+ ClusterMessage msg = manager.requestCompleted(sessionId);
+ if (msg != null) {
+ if(manager.isSendClusterDomainOnly()) {
+ cluster.sendClusterDomain(msg);
+ } else {
+ cluster.send(msg);
+ }
+ if(isDoProcessingStats())
+ nrOfSendRequests++;
+ }
+ }
/**
* check for session invalidations
@@ -351,14 +563,7 @@
if ( invalidIds.length > 0 ) {
for ( int i=0;i<invalidIds.length; i++ ) {
try {
- ClusterMessage imsg = manager.requestCompleted(invalidIds[i]);
- // FIXME send directly via ClusterManager.send
- if (imsg != null) {
- if(manager.isSendClusterDomainOnly())
- cluster.sendClusterDomain(imsg);
- else
- cluster.send(imsg);
- }
+ send(manager,cluster,invalidIds[i]);
} catch ( Exception x ) {
log.error(sm.getString("ReplicationValve.send.invalid.failure",invalidIds[i]),x);
}
@@ -387,23 +592,27 @@
* @param requestTime
* @param clusterTime
*/
- protected synchronized void updateStats(long requestTime, long clusterTime) {
- totalSendTime+=clusterTime;
- totalRequestTime+=requestTime;
- nrOfRequests++;
- if ( (nrOfRequests % 100) == 0 ) {
- if(log.isInfoEnabled()) {
+ protected void updateStats(long requestTime, long clusterTime) {
+ synchronized(this) {
+ lastSendTime=System.currentTimeMillis();
+ totalSendTime+=lastSendTime - clusterTime;
+ totalRequestTime+=lastSendTime - requestTime;
+ nrOfRequests++;
+ }
+ if(log.isInfoEnabled()) {
+ if ( (nrOfRequests % 100) == 0 ) {
log.info(sm.getString("ReplicationValve.stats",
new Object[]{
new Long(totalRequestTime/nrOfRequests),
new Long(totalSendTime/nrOfRequests),
new Long(nrOfRequests),
+ new Long(nrOfSendRequests),
+ new Long(nrOfCrossContextSendRequests),
new Long(nrOfFilterRequests),
new Long(totalRequestTime),
new Long(totalSendTime)}));
}
}
- lastSendTime=System.currentTimeMillis();
}
@@ -442,7 +651,5 @@
}
}
}
-
-
}
Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/mbeans-descriptors.xml
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/mbeans-descriptors.xml?rev=366255&r1=366254&r2=366255&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/mbeans-descriptors.xml (original)
+++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/mbeans-descriptors.xml Thu Jan 5 11:34:04 2006
@@ -988,6 +988,10 @@
domain="Catalina"
group="Valve"
type="org.apache.catalina.cluster.tcp.ReplicationValve">
+ <attribute name="info"
+ description="Class version info"
+ type="java.lang.String"
+ writeable="false"/>
<attribute name="filter"
description="resource filter to disable session replication check"
type="java.lang.String"/>
@@ -998,12 +1002,24 @@
<attribute name="primaryIndicatorName"
description="Request attribute name to indicate that request processing is at primary session node"
type="java.lang.String"/>
+ <attribute name="doProcessingStats"
+ is="true"
+ description="active statistics counting"
+ type="boolean"/>
<attribute name="nrOfRequests"
description="number of replicated requests"
type="long"
writeable="false"/>
<attribute name="nrOfFilterRequests"
description="number of filtered requests"
+ type="long"
+ writeable="false"/>
+ <attribute name="nrOfSendRequests"
+ description="number of send requests"
+ type="long"
+ writeable="false"/>
+ <attribute name="nrOfCrossContextSendRequests"
+ description="number of send cross context session requests"
type="long"
writeable="false"/>
<attribute name="totalRequestTime"
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org