You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/03/20 19:07:31 UTC
svn commit: r387267 - in
/tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha:
session/DeltaManager.java tcp/SimpleTcpCluster.java
util/IDynamicProperty.java
Author: fhanik
Date: Mon Mar 20 10:07:29 2006
New Revision: 387267
URL: http://svn.apache.org/viewcvs?rev=387267&view=rev
Log:
Cleaned up formatting on the deltamanager, no code changes. Need a base to create a backup manager using a replicated map, and hence, no need to do any message transfers
Added:
tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/util/IDynamicProperty.java
Modified:
tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/DeltaManager.java
tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/tcp/SimpleTcpCluster.java
Modified: tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/DeltaManager.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/DeltaManager.java?rev=387267&r1=387266&r2=387267&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/DeltaManager.java (original)
+++ tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/DeltaManager.java Mon Mar 20 10:07:29 2006
@@ -17,10 +17,7 @@
package org.apache.catalina.ha.session;
import java.beans.PropertyChangeEvent;
-import java.beans.PropertyChangeListener;
-import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
-import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
@@ -34,22 +31,18 @@
import org.apache.catalina.Context;
import org.apache.catalina.Engine;
import org.apache.catalina.Host;
-import org.apache.catalina.Valve;
-import org.apache.catalina.Lifecycle;
import org.apache.catalina.LifecycleException;
import org.apache.catalina.LifecycleListener;
-import org.apache.catalina.Loader;
import org.apache.catalina.Session;
+import org.apache.catalina.Valve;
+import org.apache.catalina.core.StandardContext;
import org.apache.catalina.ha.CatalinaCluster;
-import org.apache.catalina.ha.ClusterManager;
import org.apache.catalina.ha.ClusterMessage;
-import org.apache.catalina.tribes.Member;
-import org.apache.catalina.session.ManagerBase;
import org.apache.catalina.ha.tcp.ReplicationValve;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.io.ReplicationStream;
import org.apache.catalina.util.LifecycleSupport;
import org.apache.catalina.util.StringManager;
-import org.apache.catalina.core.StandardContext;
-import org.apache.catalina.tribes.io.ReplicationStream;
/**
* The DeltaManager manages replicated sessions by only replicating the deltas
@@ -73,15 +66,12 @@
public class DeltaManager extends ClusterManagerBase{
// ---------------------------------------------------- Security Classes
-
- public static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
- .getLog(DeltaManager.class);
+ public static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(DeltaManager.class);
/**
* The string manager for this package.
*/
- protected static StringManager sm = StringManager
- .getManager(Constants.Package);
+ protected static StringManager sm = StringManager.getManager(Constants.Package);
// ----------------------------------------------------- Instance Variables
@@ -99,11 +89,8 @@
* The descriptive name of this Manager implementation (for logging).
*/
protected static String managerName = "DeltaManager";
-
protected String name = null;
-
protected boolean defaultMode = false;
-
private CatalinaCluster cluster = null;
/**
@@ -120,77 +107,47 @@
* The maximum number of active Sessions allowed, or -1 for no limit.
*/
private int maxActiveSessions = -1;
-
private boolean expireSessionsOnShutdown = false;
-
private boolean notifyListenersOnReplication = true;
-
private boolean notifySessionListenersOnReplication = true;
-
private boolean stateTransfered = false ;
-
private int stateTransferTimeout = 60;
-
private boolean sendAllSessions = true;
-
private boolean sendClusterDomainOnly = true ;
-
private int sendAllSessionsSize = 1000 ;
/**
* wait time between send session block (default 2 sec)
*/
private int sendAllSessionsWaitTime = 2 * 1000 ;
-
private ArrayList receivedMessageQueue = new ArrayList() ;
-
private boolean receiverQueue = false ;
-
private boolean stateTimestampDrop = true ;
-
private long stateTransferCreateSendTime;
// ------------------------------------------------------------------ stats attributes
int rejectedSessions = 0;
-
private long sessionReplaceCounter = 0 ;
-
long processingTime = 0;
-
private long counterReceive_EVT_GET_ALL_SESSIONS = 0 ;
-
private long counterSend_EVT_ALL_SESSION_DATA = 0 ;
-
private long counterReceive_EVT_ALL_SESSION_DATA = 0 ;
-
private long counterReceive_EVT_SESSION_CREATED = 0 ;
-
private long counterReceive_EVT_SESSION_EXPIRED = 0;
-
private long counterReceive_EVT_SESSION_ACCESSED = 0 ;
-
private long counterReceive_EVT_SESSION_DELTA = 0;
-
private long counterSend_EVT_GET_ALL_SESSIONS = 0 ;
-
private long counterSend_EVT_SESSION_CREATED = 0;
-
private long counterSend_EVT_SESSION_DELTA = 0 ;
-
private long counterSend_EVT_SESSION_ACCESSED = 0;
-
private long counterSend_EVT_SESSION_EXPIRED = 0;
-
private int counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0 ;
-
private int counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0 ;
-
private int counterNoStateTransfered = 0 ;
// ------------------------------------------------------------- Constructor
-
public DeltaManager() {
super();
}
@@ -203,9 +160,7 @@
* <code><description>/<version></code>.
*/
public String getInfo() {
-
- return (info);
-
+ return info;
}
public void setName(String name) {
@@ -216,9 +171,7 @@
* Return the descriptive short name of this Manager implementation.
*/
public String getName() {
-
- return (name);
-
+ return name;
}
/**
@@ -433,9 +386,7 @@
* Return the maximum number of active Sessions allowed, or -1 for no limit.
*/
public int getMaxActiveSessions() {
-
return (this.maxActiveSessions);
-
}
/**
@@ -445,12 +396,9 @@
* The new maximum number of sessions
*/
public void setMaxActiveSessions(int max) {
-
int oldMaxActiveSessions = this.maxActiveSessions;
this.maxActiveSessions = max;
- support.firePropertyChange("maxActiveSessions", new Integer(
- oldMaxActiveSessions), new Integer(this.maxActiveSessions));
-
+ support.firePropertyChange("maxActiveSessions", new Integer(oldMaxActiveSessions), new Integer(this.maxActiveSessions));
}
/**
@@ -492,8 +440,7 @@
/**
* @param notifyListenersCreateSessionOnReplication The notifySessionListenersOnReplication to set.
*/
- public void setNotifySessionListenersOnReplication(
- boolean notifyListenersCreateSessionOnReplication) {
+ public void setNotifySessionListenersOnReplication(boolean notifyListenersCreateSessionOnReplication) {
this.notifySessionListenersOnReplication = notifyListenersCreateSessionOnReplication;
}
@@ -510,8 +457,7 @@
return notifyListenersOnReplication;
}
- public void setNotifyListenersOnReplication(
- boolean notifyListenersOnReplication) {
+ public void setNotifyListenersOnReplication(boolean notifyListenersOnReplication) {
this.notifyListenersOnReplication = notifyListenersOnReplication;
}
@@ -546,7 +492,6 @@
* The associated Container
*/
public void setContainer(Container container) {
-
// De-register from the old Container (if any)
if ((this.container != null) && (this.container instanceof Context))
((Context) this.container).removePropertyChangeListener(this);
@@ -556,8 +501,7 @@
// Register with the new Container (if any)
if ((this.container != null) && (this.container instanceof Context)) {
- setMaxInactiveInterval(((Context) this.container)
- .getSessionTimeout() * 60);
+ setMaxInactiveInterval(((Context) this.container).getSessionTimeout() * 60);
((Context) this.container).addPropertyChangeListener(this);
}
@@ -596,19 +540,16 @@
* @return The session
*/
public Session createSession(String sessionId, boolean distribute) {
-
if ((maxActiveSessions >= 0) && (sessions.size() >= maxActiveSessions)) {
rejectedSessions++;
throw new IllegalStateException(sm.getString("deltaManager.createSession.ise"));
}
-
DeltaSession session = (DeltaSession) super.createSession(sessionId) ;
if (distribute) {
sendCreateSession(session.getId(), session);
}
if (log.isDebugEnabled())
log.debug(sm.getString("deltaManager.createSession.newSession",session.getId(), new Integer(sessions.size())));
-
return (session);
}
@@ -626,8 +567,7 @@
null,
sessionId,
sessionId + "-" + System.currentTimeMillis());
- if (log.isDebugEnabled())
- log.debug(sm.getString("deltaManager.sendMessage.newSession",name, sessionId));
+ if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.sendMessage.newSession",name, sessionId));
counterSend_EVT_SESSION_CREATED++;
send(msg);
}
@@ -671,8 +611,7 @@
* @throws ClassNotFoundException
* @throws IOException
*/
- protected DeltaRequest loadDeltaRequest(DeltaSession session, byte[] data)
- throws ClassNotFoundException, IOException {
+ protected DeltaRequest loadDeltaRequest(DeltaSession session, byte[] data) throws ClassNotFoundException, IOException {
ReplicationStream ois = getReplicationStream(data);
session.getDeltaRequest().readExternal(ois);
ois.close();
@@ -687,8 +626,7 @@
* @return serialized delta request
* @throws IOException
*/
- protected byte[] unloadDeltaRequest(DeltaRequest deltaRequest)
- throws IOException {
+ protected byte[] unloadDeltaRequest(DeltaRequest deltaRequest) throws IOException {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
deltaRequest.writeExternal(oos);
@@ -706,8 +644,7 @@
* @exception IOException
* if an input/output error occurs
*/
- protected void deserializeSessions(byte[] data) throws ClassNotFoundException,
- IOException {
+ protected void deserializeSessions(byte[] data) throws ClassNotFoundException,IOException {
// Initialize our internal data structures
//sessions.clear(); //should not do this
@@ -741,10 +678,7 @@
} else {
sessionReplaceCounter++;
// FIXME better is to grap this sessions again !
- if (log.isWarnEnabled())
- log.warn(sm.getString(
- "deltaManager.loading.existing.session",
- session.getIdInternal()));
+ if (log.isWarnEnabled()) log.warn(sm.getString("deltaManager.loading.existing.session",session.getIdInternal()));
}
add(session);
}
@@ -757,14 +691,12 @@
} finally {
// Close the input stream
try {
- if (ois != null)
- ois.close();
+ if (ois != null) ois.close();
} catch (IOException f) {
// ignored
}
ois = null;
- if (originalLoader != null)
- Thread.currentThread().setContextClassLoader(originalLoader);
+ if (originalLoader != null) Thread.currentThread().setContextClassLoader(originalLoader);
}
}
@@ -820,9 +752,7 @@
* The listener to add
*/
public void addLifecycleListener(LifecycleListener listener) {
-
lifecycle.addLifecycleListener(listener);
-
}
/**
@@ -830,9 +760,7 @@
* Lifecycle has no listeners registered, a zero-length array is returned.
*/
public LifecycleListener[] findLifecycleListeners() {
-
return lifecycle.findLifecycleListeners();
-
}
/**
@@ -842,9 +770,7 @@
* The listener to remove
*/
public void removeLifecycleListener(LifecycleListener listener) {
-
lifecycle.removeLifecycleListener(listener);
-
}
/**
@@ -857,8 +783,7 @@
* component from being used
*/
public void start() throws LifecycleException {
- if (!initialized)
- init();
+ if (!initialized) init();
// Validate and update our current component state
if (started) {
@@ -875,6 +800,7 @@
//the channel is already running
Cluster cluster = getCluster() ;
// stop remove cluster binding
+ //wow, how many nested levels of if statements can we have ;)
if(cluster == null) {
Container context = getContainer() ;
if(context != null && context instanceof Context) {
@@ -888,7 +814,7 @@
if(engine != null && engine instanceof Engine) {
cluster = engine.getCluster();
if(cluster != null && cluster instanceof CatalinaCluster) {
- setCluster((CatalinaCluster) cluster) ;
+ setCluster((CatalinaCluster) cluster) ;
}
} else {
cluster = null ;
@@ -908,13 +834,10 @@
} else if( cluster.getContainer() instanceof Engine){
type = "Engine" ;
}
- log.info(sm
- .getString("deltaManager.registerCluster", getName(), type, cluster.getClusterName()));
+ log.info(sm.getString("deltaManager.registerCluster", getName(), type, cluster.getClusterName()));
}
}
- if (log.isInfoEnabled())
- log.info(sm
- .getString("deltaManager.startClustering", getName()));
+ if (log.isInfoEnabled()) log.info(sm.getString("deltaManager.startClustering", getName()));
//to survice context reloads, as only a stop/start is called, not
// createManager
((CatalinaCluster)cluster).addManager(getName(), this);
@@ -937,9 +860,7 @@
if(mbr == null) { // No domain member found
return;
}
- SessionMessage msg = new SessionMessageImpl(this.getName(),
- SessionMessage.EVT_GET_ALL_SESSIONS, null, "GET-ALL",
- "GET-ALL-" + getName());
+ SessionMessage msg = new SessionMessageImpl(this.getName(),SessionMessage.EVT_GET_ALL_SESSIONS, null, "GET-ALL","GET-ALL-" + getName());
// set reference time
stateTransferCreateSendTime = beforeSendTime ;
// request session state
@@ -951,37 +872,22 @@
receiverQueue = true ;
}
cluster.send(msg, mbr);
- if (log.isWarnEnabled())
- log.warn(sm.getString("deltaManager.waitForSessionState",
- getName(), mbr));
+ if (log.isWarnEnabled()) log.warn(sm.getString("deltaManager.waitForSessionState",getName(), mbr));
// FIXME At sender ack mode this method check only the state transfer and resend is a problem!
waitForSendAllSessions(beforeSendTime);
} finally {
synchronized(receivedMessageQueue) {
- for (Iterator iter = receivedMessageQueue.iterator(); iter
- .hasNext();) {
+ for (Iterator iter = receivedMessageQueue.iterator(); iter.hasNext();) {
SessionMessage smsg = (SessionMessage) iter.next();
if (!stateTimestampDrop) {
- messageReceived(smsg,
- smsg.getAddress() != null ? (Member) smsg
- .getAddress() : null);
+ messageReceived(smsg, smsg.getAddress() != null ? (Member) smsg.getAddress() : null);
} else {
- if (smsg.getEventType() != SessionMessage.EVT_GET_ALL_SESSIONS
- && smsg.getTimestamp() >= stateTransferCreateSendTime) {
+ if (smsg.getEventType() != SessionMessage.EVT_GET_ALL_SESSIONS && smsg.getTimestamp() >= stateTransferCreateSendTime) {
// FIXME handle EVT_GET_ALL_SESSIONS later
- messageReceived(
- smsg,
- smsg.getAddress() != null ? (Member) smsg
- .getAddress()
- : null);
+ messageReceived(smsg,smsg.getAddress() != null ? (Member) smsg.getAddress() : null);
} else {
if (log.isWarnEnabled()) {
- log.warn(sm.getString(
- "deltaManager.dropMessage",
- getName(), smsg
- .getEventTypeString(),
- new Date(stateTransferCreateSendTime), new Date(
- smsg.getTimestamp())));
+ log.warn(sm.getString("deltaManager.dropMessage",getName(), smsg.getEventTypeString(),new Date(stateTransferCreateSendTime), new Date(smsg.getTimestamp())));
}
}
}
@@ -991,8 +897,7 @@
}
}
} else {
- if (log.isInfoEnabled())
- log.info(sm.getString("deltaManager.noMembers", getName()));
+ if (log.isInfoEnabled()) log.info(sm.getString("deltaManager.noMembers", getName()));
}
}
@@ -1002,24 +907,22 @@
*/
protected void registerSessionAtReplicationValve(DeltaSession session) {
if(replicationValve == null) {
- if(container instanceof StandardContext
- && ((StandardContext)container).getCrossContext()) {
+ if(container instanceof StandardContext && ((StandardContext)container).getCrossContext()) {
Cluster cluster = getCluster() ;
if(cluster != null && cluster instanceof CatalinaCluster) {
Valve[] valves = ((CatalinaCluster)cluster).getValves();
if(valves != null && valves.length > 0) {
for(int i=0; replicationValve == null && i < valves.length ; i++ ){
- if(valves[i] instanceof ReplicationValve)
- replicationValve = (ReplicationValve)valves[i] ;
- }
+ if(valves[i] instanceof ReplicationValve) replicationValve = (ReplicationValve)valves[i] ;
+ }//for
if(replicationValve == null && log.isDebugEnabled()) {
log.debug("no ReplicationValve found for CrossContext Support");
- }
- }
- }
- }
- }
+ }//endif
+ }//end if
+ }//endif
+ }//end if
+ }//end if
if(replicationValve != null) {
replicationValve.registerReplicationSession(session);
}
@@ -1036,19 +939,13 @@
if(isSendClusterDomainOnly()) {
for (int i = 0; mbr == null && i < mbrs.length; i++) {
Member member = mbrs[i];
- if(localMemberDomain.equals(member.getDomain()))
- mbr = member ;
+ if(localMemberDomain.equals(member.getDomain())) mbr = member ;
}
} else {
- if(mbrs.length != 0 )
- mbr = mbrs[0];
+ if(mbrs.length != 0 ) mbr = mbrs[0];
}
- if(mbr == null && log.isWarnEnabled())
- log.warn(sm.getString("deltaManager.noMasterMember",
- getName(), localMemberDomain));
- if(mbr != null && log.isDebugEnabled())
- log.warn(sm.getString("deltaManager.foundMasterMember",
- getName(), mbr));
+ if(mbr == null && log.isWarnEnabled()) log.warn(sm.getString("deltaManager.noMasterMember",getName(), localMemberDomain));
+ if(mbr != null && log.isDebugEnabled()) log.warn(sm.getString("deltaManager.foundMasterMember",getName(), mbr));
return mbr;
}
@@ -1066,6 +963,7 @@
try {
Thread.sleep(100);
} catch (Exception sleep) {
+ //
}
reqNow = System.currentTimeMillis();
isTimeout = ((reqNow - reqStart) > (1000 * getStateTransferTimeout()));
@@ -1084,12 +982,10 @@
}
if (isTimeout || (!getStateTransfered())) {
counterNoStateTransfered++ ;
- log.error(sm.getString("deltaManager.noSessionState",
- getName(),new Date(beforeSendTime),new Long(reqNow - beforeSendTime)));
+ log.error(sm.getString("deltaManager.noSessionState",getName(),new Date(beforeSendTime),new Long(reqNow - beforeSendTime)));
} else {
if (log.isInfoEnabled())
- log.info(sm.getString("deltaManager.sessionReceived",
- getName(), new Date(beforeSendTime), new Long(reqNow - beforeSendTime)));
+ log.info(sm.getString("deltaManager.sessionReceived",getName(), new Date(beforeSendTime), new Long(reqNow - beforeSendTime)));
}
}
@@ -1110,14 +1006,12 @@
// Validate and update our current component state
if (!started)
- throw new LifecycleException(sm
- .getString("deltaManager.notStarted"));
+ throw new LifecycleException(sm.getString("deltaManager.notStarted"));
lifecycle.fireLifecycleEvent(STOP_EVENT, null);
started = false;
// Expire all active sessions
- if (log.isInfoEnabled())
- log.info(sm.getString("deltaManager.expireSessions", getName()));
+ if (log.isInfoEnabled()) log.info(sm.getString("deltaManager.expireSessions", getName()));
Session sessions[] = findSessions();
for (int i = 0; i < sessions.length; i++) {
DeltaSession session = (DeltaSession) sessions[i];
@@ -1155,11 +1049,9 @@
// Process a relevant property change
if (event.getPropertyName().equals("sessionTimeout")) {
try {
- setMaxInactiveInterval(((Integer) event.getNewValue())
- .intValue() * 60);
+ setMaxInactiveInterval(((Integer) event.getNewValue()).intValue() * 60);
} catch (NumberFormatException e) {
- log.error(sm.getString("deltaManager.sessionTimeout", event
- .getNewValue()));
+ log.error(sm.getString("deltaManager.sessionTimeout", event.getNewValue()));
}
}
@@ -1179,27 +1071,26 @@
if (cmsg != null && cmsg instanceof SessionMessage) {
SessionMessage msg = (SessionMessage) cmsg;
switch (msg.getEventType()) {
- case SessionMessage.EVT_GET_ALL_SESSIONS:
- case SessionMessage.EVT_SESSION_CREATED:
- case SessionMessage.EVT_SESSION_EXPIRED:
- case SessionMessage.EVT_SESSION_ACCESSED:
- case SessionMessage.EVT_SESSION_DELTA: {
- synchronized(receivedMessageQueue) {
- if(receiverQueue) {
- receivedMessageQueue.add(msg);
- return ;
+ case SessionMessage.EVT_GET_ALL_SESSIONS:
+ case SessionMessage.EVT_SESSION_CREATED:
+ case SessionMessage.EVT_SESSION_EXPIRED:
+ case SessionMessage.EVT_SESSION_ACCESSED:
+ case SessionMessage.EVT_SESSION_DELTA: {
+ synchronized(receivedMessageQueue) {
+ if(receiverQueue) {
+ receivedMessageQueue.add(msg);
+ return ;
+ }
}
+ break;
+ }
+ default: {
+ //we didn't queue, do nothing
+ break;
}
- break;
- }
- default: {
- //we didn't queue, do nothing
- break;
- }
} //switch
- messageReceived(msg, msg.getAddress() != null ? (Member) msg
- .getAddress() : null);
+ messageReceived(msg, msg.getAddress() != null ? (Member) msg.getAddress() : null);
}
}
@@ -1226,8 +1117,10 @@
counterSend_EVT_SESSION_DELTA++;
byte[] data = unloadDeltaRequest(deltaRequest);
msg = new SessionMessageImpl(getName(),
- SessionMessage.EVT_SESSION_DELTA, data, sessionId,
- sessionId + "-" + System.currentTimeMillis());
+ SessionMessage.EVT_SESSION_DELTA,
+ data,
+ sessionId,
+ sessionId + "-" + System.currentTimeMillis());
session.resetDeltaRequest();
}
}
@@ -1235,48 +1128,42 @@
if(!session.isPrimarySession()) {
counterSend_EVT_SESSION_ACCESSED++;
msg = new SessionMessageImpl(getName(),
- SessionMessage.EVT_SESSION_ACCESSED, null, sessionId,
- sessionId + "-" + System.currentTimeMillis());
+ SessionMessage.EVT_SESSION_ACCESSED,
+ null,
+ sessionId,
+ sessionId + "-" + System.currentTimeMillis());
if (log.isDebugEnabled()) {
- log.debug(sm.getString(
- "deltaManager.createMessage.accessChangePrimary",
- getName(), sessionId));
+ log.debug(sm.getString("deltaManager.createMessage.accessChangePrimary",getName(), sessionId));
}
}
} else { // log only outside synch block!
if (log.isDebugEnabled()) {
- log.debug(sm.getString(
- "deltaManager.createMessage.delta",
- getName(), sessionId));
+ log.debug(sm.getString("deltaManager.createMessage.delta",getName(), sessionId));
}
}
session.setPrimarySession(true);
//check to see if we need to send out an access message
if ((msg == null)) {
- long replDelta = System.currentTimeMillis()
- - session.getLastTimeReplicated();
+ long replDelta = System.currentTimeMillis() - session.getLastTimeReplicated();
if (replDelta > (getMaxInactiveInterval() * 1000)) {
counterSend_EVT_SESSION_ACCESSED++;
msg = new SessionMessageImpl(getName(),
- SessionMessage.EVT_SESSION_ACCESSED, null,
- sessionId, sessionId + "-" + System.currentTimeMillis());
+ SessionMessage.EVT_SESSION_ACCESSED,
+ null,
+ sessionId,
+ sessionId + "-" + System.currentTimeMillis());
if (log.isDebugEnabled()) {
- log.debug(sm.getString(
- "deltaManager.createMessage.access", getName(),
- sessionId));
+ log.debug(sm.getString("deltaManager.createMessage.access", getName(),sessionId));
}
}
}
//update last replicated time
- if (msg != null)
- session.setLastTimeReplicated(System.currentTimeMillis());
+ if (msg != null) session.setLastTimeReplicated(System.currentTimeMillis());
return msg;
} catch (IOException x) {
- log.error(sm.getString(
- "deltaManager.createMessage.unableCreateDeltaRequest",
- sessionId), x);
+ log.error(sm.getString("deltaManager.createMessage.unableCreateDeltaRequest",sessionId), x);
return null;
}
@@ -1329,12 +1216,8 @@
*/
protected void sessionExpired(String id) {
counterSend_EVT_SESSION_EXPIRED++ ;
- SessionMessage msg = new SessionMessageImpl(getName(),
- SessionMessage.EVT_SESSION_EXPIRED, null, id, id
- + "-EXPIRED-MSG");
- if (log.isDebugEnabled())
- log.debug(sm.getString("deltaManager.createMessage.expire",
- getName(), id));
+ SessionMessage msg = new SessionMessageImpl(getName(),SessionMessage.EVT_SESSION_EXPIRED, null, id, id+ "-EXPIRED-MSG");
+ if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.createMessage.expire",getName(), id));
send(msg);
}
@@ -1348,8 +1231,7 @@
int expireDirect = 0 ;
int expireIndirect = 0 ;
- if(log.isDebugEnabled())
- log.debug("Start expire all sessions " + getName() + " at " + timeNow + " sessioncount " + sessions.length);
+ if(log.isDebugEnabled()) log.debug("Start expire all sessions " + getName() + " at " + timeNow + " sessioncount " + sessions.length);
for (int i = 0; i < sessions.length; i++) {
if (sessions[i] instanceof DeltaSession) {
DeltaSession session = (DeltaSession) sessions[i];
@@ -1359,13 +1241,12 @@
expireDirect++;
} else {
expireIndirect++;
- }
- }
- }
- }
+ }//end if
+ }//end if
+ }//end if
+ }//for
long timeEnd = System.currentTimeMillis();
- if(log.isDebugEnabled())
- log.debug("End expire sessions " + getName() + " exipre processingTime " + (timeEnd - timeNow) + " expired direct sessions: " + expireDirect + " expired direct sessions: " + expireIndirect);
+ if(log.isDebugEnabled()) log.debug("End expire sessions " + getName() + " exipre processingTime " + (timeEnd - timeNow) + " expired direct sessions: " + expireDirect + " expired direct sessions: " + expireIndirect);
}
@@ -1390,12 +1271,11 @@
boolean sameDomain= localMemberDomain.equals(sender.getDomain());
if (!sameDomain && log.isWarnEnabled()) {
log.warn(sm.getString("deltaManager.receiveMessage.fromWrongDomain",
- new Object[] {getName(),
- msg.getEventTypeString(),
- sender,
- sender.getDomain(),
- localMemberDomain }
- ));
+ new Object[] {getName(),
+ msg.getEventTypeString(),
+ sender,
+ sender.getDomain(),
+ localMemberDomain }));
}
return sameDomain ;
}
@@ -1416,47 +1296,44 @@
return;
}
try {
- if (log.isDebugEnabled())
- log.debug(sm.getString("deltaManager.receiveMessage.eventType",
- getName(), msg.getEventTypeString(), sender));
+ if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.eventType",getName(), msg.getEventTypeString(), sender));
switch (msg.getEventType()) {
- case SessionMessage.EVT_GET_ALL_SESSIONS: {
- handleGET_ALL_SESSIONS(msg,sender);
- break;
- }
- case SessionMessage.EVT_ALL_SESSION_DATA: {
- handleALL_SESSION_DATA(msg,sender);
- break;
- }
- case SessionMessage.EVT_ALL_SESSION_TRANSFERCOMPLETE: {
- handleALL_SESSION_TRANSFERCOMPLETE(msg,sender);
- break;
- }
- case SessionMessage.EVT_SESSION_CREATED: {
- handleSESSION_CREATED(msg,sender);
- break;
- }
- case SessionMessage.EVT_SESSION_EXPIRED: {
- handleSESSION_EXPIRED(msg,sender);
- break;
- }
- case SessionMessage.EVT_SESSION_ACCESSED: {
- handleSESSION_ACCESSED(msg,sender);
- break;
- }
- case SessionMessage.EVT_SESSION_DELTA: {
- handleSESSION_DELTA(msg,sender);
- break;
- }
- default: {
- //we didn't recognize the message type, do nothing
- break;
- }
+ case SessionMessage.EVT_GET_ALL_SESSIONS: {
+ handleGET_ALL_SESSIONS(msg,sender);
+ break;
+ }
+ case SessionMessage.EVT_ALL_SESSION_DATA: {
+ handleALL_SESSION_DATA(msg,sender);
+ break;
+ }
+ case SessionMessage.EVT_ALL_SESSION_TRANSFERCOMPLETE: {
+ handleALL_SESSION_TRANSFERCOMPLETE(msg,sender);
+ break;
+ }
+ case SessionMessage.EVT_SESSION_CREATED: {
+ handleSESSION_CREATED(msg,sender);
+ break;
+ }
+ case SessionMessage.EVT_SESSION_EXPIRED: {
+ handleSESSION_EXPIRED(msg,sender);
+ break;
+ }
+ case SessionMessage.EVT_SESSION_ACCESSED: {
+ handleSESSION_ACCESSED(msg,sender);
+ break;
+ }
+ case SessionMessage.EVT_SESSION_DELTA: {
+ handleSESSION_DELTA(msg,sender);
+ break;
+ }
+ default: {
+ //we didn't recognize the message type, do nothing
+ break;
+ }
} //switch
} catch (Exception x) {
- log.error(sm.getString("deltaManager.receiveMessage.error",
- getName()), x);
+ log.error(sm.getString("deltaManager.receiveMessage.error",getName()), x);
}
}
@@ -1470,10 +1347,7 @@
*/
protected void handleALL_SESSION_TRANSFERCOMPLETE(SessionMessage msg, Member sender) {
counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE++ ;
- if (log.isDebugEnabled())
- log.debug(sm.getString(
- "deltaManager.receiveMessage.transfercomplete",
- getName(), sender.getHost(), new Integer(sender.getPort())));
+ if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.transfercomplete",getName(), sender.getHost(), new Integer(sender.getPort())));
stateTransferCreateSendTime = msg.getTimestamp() ;
stateTransfered = true ;
}
@@ -1485,15 +1359,12 @@
* @throws IOException
* @throws ClassNotFoundException
*/
- protected void handleSESSION_DELTA(SessionMessage msg, Member sender)
- throws IOException, ClassNotFoundException {
+ protected void handleSESSION_DELTA(SessionMessage msg, Member sender) throws IOException, ClassNotFoundException {
counterReceive_EVT_SESSION_DELTA++;
byte[] delta = msg.getSession();
DeltaSession session = (DeltaSession) findSession(msg.getSessionID());
if (session != null) {
- if (log.isDebugEnabled())
- log.debug(sm.getString("deltaManager.receiveMessage.delta",
- getName(), msg.getSessionID()));
+ if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.delta",getName(), msg.getSessionID()));
DeltaRequest dreq = loadDeltaRequest(session, delta);
dreq.execute(session, notifyListenersOnReplication);
session.setPrimarySession(false);
@@ -1508,13 +1379,9 @@
*/
protected void handleSESSION_ACCESSED(SessionMessage msg,Member sender) throws IOException {
counterReceive_EVT_SESSION_ACCESSED++;
- DeltaSession session = (DeltaSession) findSession(msg
- .getSessionID());
+ DeltaSession session = (DeltaSession) findSession(msg.getSessionID());
if (session != null) {
- if (log.isDebugEnabled())
- log.debug(sm.getString(
- "deltaManager.receiveMessage.accessed",
- getName(), msg.getSessionID()));
+ if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.accessed",getName(), msg.getSessionID()));
session.access();
session.setPrimarySession(false);
session.endAccess();
@@ -1529,13 +1396,9 @@
*/
protected void handleSESSION_EXPIRED(SessionMessage msg,Member sender) throws IOException {
counterReceive_EVT_SESSION_EXPIRED++;
- DeltaSession session = (DeltaSession) findSession(msg
- .getSessionID());
+ DeltaSession session = (DeltaSession) findSession(msg.getSessionID());
if (session != null) {
- if (log.isDebugEnabled())
- log.debug(sm.getString(
- "deltaManager.receiveMessage.expired",
- getName(), msg.getSessionID()));
+ if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.expired",getName(), msg.getSessionID()));
session.expire(notifySessionListenersOnReplication, false);
}
}
@@ -1547,10 +1410,7 @@
*/
protected void handleSESSION_CREATED(SessionMessage msg,Member sender) {
counterReceive_EVT_SESSION_CREATED++;
- if (log.isDebugEnabled())
- log.debug(sm.getString(
- "deltaManager.receiveMessage.createNewSession",
- getName(), msg.getSessionID()));
+ if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.createNewSession",getName(), msg.getSessionID()));
DeltaSession session = (DeltaSession) createEmptySession();
session.setManager(this);
session.setValid(true);
@@ -1574,16 +1434,10 @@
*/
protected void handleALL_SESSION_DATA(SessionMessage msg,Member sender) throws ClassNotFoundException, IOException {
counterReceive_EVT_ALL_SESSION_DATA++;
- if (log.isDebugEnabled())
- log.debug(sm.getString(
- "deltaManager.receiveMessage.allSessionDataBegin",
- getName()));
+ if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.allSessionDataBegin",getName()));
byte[] data = msg.getSession();
deserializeSessions(data);
- if (log.isDebugEnabled())
- log.debug(sm.getString(
- "deltaManager.receiveMessage.allSessionDataAfter",
- getName()));
+ if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.allSessionDataAfter",getName()));
//stateTransferred = true;
}
@@ -1596,13 +1450,10 @@
* @param sender
* @throws IOException
*/
- protected void handleGET_ALL_SESSIONS(SessionMessage msg, Member sender)
- throws IOException {
+ protected void handleGET_ALL_SESSIONS(SessionMessage msg, Member sender) throws IOException {
counterReceive_EVT_GET_ALL_SESSIONS++;
//get a list of all the session from this manager
- if (log.isDebugEnabled())
- log.debug(sm.getString(
- "deltaManager.receiveMessage.unloadingBegin", getName()));
+ if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.unloadingBegin", getName()));
// Write the number of active sessions, followed by the details
// get all sessions and serialize without sync
Session[] currentSessions = findSessions();
@@ -1611,13 +1462,10 @@
sendSessions(sender, currentSessions, findSessionTimestamp);
} else {
// send session at blocks
- int len = currentSessions.length < getSendAllSessionsSize() ? currentSessions.length
- : getSendAllSessionsSize();
+ int len = currentSessions.length < getSendAllSessionsSize() ? currentSessions.length : getSendAllSessionsSize();
Session[] sendSessions = new Session[len];
for (int i = 0; i < currentSessions.length; i += getSendAllSessionsSize()) {
- len = i + getSendAllSessionsSize() > currentSessions.length ? currentSessions.length
- - i
- : getSendAllSessionsSize();
+ len = i + getSendAllSessionsSize() > currentSessions.length ? currentSessions.length - i : getSendAllSessionsSize();
System.arraycopy(currentSessions, i, sendSessions, 0, len);
sendSessions(sender, sendSessions,findSessionTimestamp);
if (getSendAllSessionsWaitTime() > 0) {
@@ -1625,19 +1473,13 @@
Thread.sleep(getSendAllSessionsWaitTime());
} catch (Exception sleep) {
}
- }
- }
- }
+ }//end if
+ }//for
+ }//end if
- SessionMessage newmsg = new SessionMessageImpl(name,
- SessionMessage.EVT_ALL_SESSION_TRANSFERCOMPLETE, null,
- "SESSION-STATE-TRANSFERED", "SESSION-STATE-TRANSFERED"
- + getName());
+ SessionMessage newmsg = new SessionMessageImpl(name,SessionMessage.EVT_ALL_SESSION_TRANSFERCOMPLETE, null,"SESSION-STATE-TRANSFERED", "SESSION-STATE-TRANSFERED"+ getName());
newmsg.setTimestamp(findSessionTimestamp);
- if (log.isDebugEnabled())
- log.debug(sm.getString(
- "deltaManager.createMessage.allSessionTransfered",
- getName()));
+ if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.createMessage.allSessionTransfered",getName()));
counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE++;
cluster.send(newmsg, sender);
}
@@ -1650,24 +1492,12 @@
* @param sendTimestamp
* @throws IOException
*/
- protected void sendSessions(Member sender, Session[] currentSessions,
- long sendTimestamp) throws IOException {
+ protected void sendSessions(Member sender, Session[] currentSessions,long sendTimestamp) throws IOException {
byte[] data = serializeSessions(currentSessions);
- if (log.isDebugEnabled())
- log.debug(sm.getString(
- "deltaManager.receiveMessage.unloadingAfter",
- getName()));
- SessionMessage newmsg = new SessionMessageImpl(name,
- SessionMessage.EVT_ALL_SESSION_DATA, data,
- "SESSION-STATE", "SESSION-STATE-" + getName());
+ if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.unloadingAfter",getName()));
+ SessionMessage newmsg = new SessionMessageImpl(name,SessionMessage.EVT_ALL_SESSION_DATA, data,"SESSION-STATE", "SESSION-STATE-" + getName());
newmsg.setTimestamp(sendTimestamp);
- //if(isSendSESSIONSTATEcompressed()) {
- // newmsg.setCompress(ClusterMessage.RESEND_ALLOWED);
- //}
- if (log.isDebugEnabled())
- log.debug(sm.getString(
- "deltaManager.createMessage.allSessionData",
- getName()));
+ if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.createMessage.allSessionData",getName()));
counterSend_EVT_ALL_SESSION_DATA++;
cluster.send(newmsg, sender);
}
Modified: tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/tcp/SimpleTcpCluster.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/tcp/SimpleTcpCluster.java?rev=387267&r1=387266&r2=387267&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/tcp/SimpleTcpCluster.java (original)
+++ tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/tcp/SimpleTcpCluster.java Mon Mar 20 10:07:29 2006
@@ -48,7 +48,7 @@
import org.apache.catalina.tribes.group.GroupChannel;
import org.apache.catalina.ha.session.DeltaManager;
-import org.apache.catalina.tribes.util.IDynamicProperty;
+import org.apache.catalina.ha.util.IDynamicProperty;
import org.apache.catalina.util.LifecycleSupport;
import org.apache.catalina.util.StringManager;
import org.apache.commons.logging.Log;
@@ -353,7 +353,7 @@
* @return Member
*/
public Member getLocalMember() {
- return channel.getLocalMember();
+ return channel.getLocalMember(true);
}
// ------------------------------------------------------------- dynamic
@@ -779,11 +779,11 @@
msg.setAddress(getLocalMember());
if (dest != null) {
if (!getLocalMember().equals(dest)) {
- channel.send(new Member[] {dest}, msg);
+ channel.send(new Member[] {dest}, msg,0);
} else
log.error("Unable to send message to local member " + msg);
} else {
- channel.send(channel.getMembers(),msg);
+ channel.send(channel.getMembers(),msg,0);
}
} catch (Exception x) {
log.error("Unable to send message through cluster sender.", x);
Added: tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/util/IDynamicProperty.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/util/IDynamicProperty.java?rev=387267&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/util/IDynamicProperty.java (added)
+++ tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/util/IDynamicProperty.java Mon Mar 20 10:07:29 2006
@@ -0,0 +1,57 @@
+/*
+ * Copyright 1999,2004-2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.ha.util;
+
+import java.util.Iterator;
+
+/**
+ * @author Peter Rossbach
+ * @version $Revision: 304032 $, $Date: 2005-07-27 10:11:55 -0500 (Wed, 27 Jul 2005) $
+ */
+
+public interface IDynamicProperty {
+
+ /**
+ * set config attributes with reflect
+ *
+ * @param name
+ * @param value
+ */
+ public void setProperty(String name, Object value) ;
+
+ /**
+ * get current config
+ *
+ * @param key
+ * @return The property
+ */
+ public Object getProperty(String key) ;
+ /**
+ * Get all properties keys
+ *
+ * @return An iterator over the property names
+ */
+ public Iterator getPropertyNames() ;
+
+ /**
+ * remove a configured property.
+ *
+ * @param key
+ */
+ public void removeProperty(String key) ;
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org