You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2003/09/10 00:21:19 UTC

cvs commit: jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp ReplicationValve.java

fhanik      2003/09/09 15:21:19

  Modified:    modules/cluster/src/share/org/apache/catalina/cluster/session
                        ReplicatedSession.java SessionMessage.java
                        SimpleTcpReplicationManager.java
               modules/cluster/src/share/org/apache/catalina/cluster/tcp
                        ReplicationValve.java
  Log:
  1. added in support for sending out access time pings
  2. added in support for session.invalidate to go out to the other nodes in the cluster
  
  Revision  Changes    Path
  1.7       +24 -4     jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/ReplicatedSession.java
  
  Index: ReplicatedSession.java
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/ReplicatedSession.java,v
  retrieving revision 1.6
  retrieving revision 1.7
  diff -u -r1.6 -r1.7
  --- ReplicatedSession.java	26 Aug 2003 20:45:04 -0000	1.6
  +++ ReplicatedSession.java	9 Sep 2003 22:21:18 -0000	1.7
  @@ -101,6 +101,8 @@
   
       private transient Manager mManager = null;
       protected boolean isDirty = false;
  +    private transient long lastAccessWasDistributed = System.currentTimeMillis();
  +
       public ReplicatedSession(Manager manager) {
           super(manager);
           mManager = manager;
  @@ -118,7 +120,13 @@
       }
   
   
  -
  +    public void setLastAccessWasDistributed(long time) {
  +        lastAccessWasDistributed = time;
  +    }
  +    
  +    public long getLastAccessWasDistributed() {
  +        return lastAccessWasDistributed;
  +    }
   
   
       public void removeAttribute(String name) {
  @@ -178,7 +186,19 @@
           setIsDirty(true);
       }
   
  +    public void expire() {
  +        SimpleTcpReplicationManager mgr =(SimpleTcpReplicationManager)getManager();
  +        mgr.sessionInvalidated(getId());
  +        setIsDirty(true);
  +        super.expire();
  +    }
   
  +    public void invalidate() {
  +        SimpleTcpReplicationManager mgr =(SimpleTcpReplicationManager)getManager();
  +        mgr.sessionInvalidated(getId());
  +        setIsDirty(true);
  +        super.invalidate();
  +    }
   
   
       /**
  
  
  
  1.4       +9 -9      jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/SessionMessage.java
  
  Index: SessionMessage.java
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/SessionMessage.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- SessionMessage.java	24 Apr 2003 04:24:01 -0000	1.3
  +++ SessionMessage.java	9 Sep 2003 22:21:18 -0000	1.4
  @@ -116,18 +116,18 @@
        * Event type used when a session has expired, but we don't
        * want to notify the session listeners
        */
  -//    public static final int EVT_SESSION_EXPIRED_WONOTIFY = 2;
  +    public static final int EVT_SESSION_EXPIRED_WONOTIFY = 2;
       /**
        * Event type used when a session has expired, and we do
        * want to notify the session listeners
        */
  -//    public static final int EVT_SESSION_EXPIRED_WNOTIFY = 7;
  +    public static final int EVT_SESSION_EXPIRED_WNOTIFY = 7;
       /**
        * Event type used when a session has been accessed (ie, last access time
        * has been updated. This is used so that the replicated sessions will not expire
        * on the network
        */
  -//    public static final int EVT_SESSION_ACCESSED = 3;
  +    public static final int EVT_SESSION_ACCESSED = 3;
       /**
        * Event type used when a server comes online for the first time.
        * The first thing the newly started server wants to do is to grab the
  @@ -261,12 +261,12 @@
           switch (mEvtType)
           {
               case EVT_SESSION_CREATED : return "SESSION-MODIFIED";
  -//            case EVT_SESSION_EXPIRED_WNOTIFY : return "SESSION-EXPIRED-WITH-NOTIFY";
  -//            case EVT_SESSION_EXPIRED_WONOTIFY : return "SESSION-EXPIRED-WITHOUT-NOTIFY";
  +            case EVT_SESSION_EXPIRED_WNOTIFY : return "SESSION-EXPIRED-WITH-NOTIFY";
  +            case EVT_SESSION_EXPIRED_WONOTIFY : return "SESSION-EXPIRED-WITHOUT-NOTIFY";
   //            case EVT_ATTRIBUTE_ADDED : return "SESSION-ATTRIBUTE-ADDED";
   //            case EVT_ATTRIBUTE_REMOVED_WNOTIFY : return "SESSION-ATTRIBUTE-REMOVED-WITH-NOTIFY";
   //            case EVT_ATTRIBUTE_REMOVED_WONOTIFY: return "SESSION-ATTRIBUTE-REMOVED-WITHOUT-NOTIFY";
  -//            case EVT_SESSION_ACCESSED : return "SESSION-ACCESSED";
  +            case EVT_SESSION_ACCESSED : return "SESSION-ACCESSED";
               case EVT_GET_ALL_SESSIONS : return "SESSION-GET-ALL";
   //            case EVT_SET_SESSION_NOTE: return "SET-SESSION-NOTE";
   //            case EVT_SET_USER_PRINCIPAL : return "SET-USER-PRINCIPAL";
  
  
  
  1.10      +84 -19    jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/SimpleTcpReplicationManager.java
  
  Index: SimpleTcpReplicationManager.java
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/SimpleTcpReplicationManager.java,v
  retrieving revision 1.9
  retrieving revision 1.10
  diff -u -r1.9 -r1.10
  --- SimpleTcpReplicationManager.java	8 Jul 2003 06:30:45 -0000	1.9
  +++ SimpleTcpReplicationManager.java	9 Sep 2003 22:21:18 -0000	1.10
  @@ -145,6 +145,8 @@
       protected boolean distributable = true;
   
       protected org.apache.catalina.cluster.tcp.SimpleTcpCluster cluster;
  +    
  +    protected java.util.HashMap invalidatedSessions = new java.util.HashMap();
       /**
        * Constructor, just calls super()
        *
  @@ -283,28 +285,75 @@
           add(session);
           return session;
       }
  -
  +    
  +    public void sessionInvalidated(String sessionId) {
  +        synchronized ( invalidatedSessions ) {
  +            invalidatedSessions.put(sessionId, sessionId);
  +        }
  +    }
  +    
  +    public String[] getInvalidatedSessions() {
  +        synchronized ( invalidatedSessions ) {
  +            String[] result = new String[invalidatedSessions.size()];
  +            invalidatedSessions.values().toArray(result);
  +            return result;
  +        }
  +        
  +    }
  +    
       public SessionMessage requestCompleted(String sessionId)
       {
           //notify javagroups
           try
           {
  -            ReplicatedSession session = (ReplicatedSession)findSession(sessionId);
  -            if (session != null)
  -            {
  -                //return immediately if the session is not dirty
  -                if ( useDirtyFlag && (!session.isDirty())) return null;
  -                session.setIsDirty(false);
  -                if (getDebug()>5) {
  -                    try {
  -                        log.debug("Sending session to cluster="+session);
  -                    }catch ( Exception ignore) {}
  -                }
  -                SessionMessage msg = new SessionMessage(name,SessionMessage.EVT_SESSION_CREATED,
  -                    writeSession(session),
  -                    session.getId());
  +            if ( invalidatedSessions.get(sessionId) != null ) {
  +                synchronized ( invalidatedSessions ) {
  +                    invalidatedSessions.remove(sessionId);
  +                    SessionMessage msg = new SessionMessage(name,
  +                    SessionMessage.EVT_SESSION_EXPIRED_WNOTIFY,
  +                    null,
  +                    sessionId);
                   return msg;
  -            } //end if
  +                }
  +            } else {
  +                ReplicatedSession session = (ReplicatedSession) findSession(
  +                    sessionId);
  +                if (session != null) {
  +                    //return immediately if the session is not dirty
  +                    if (useDirtyFlag && (!session.isDirty())) {
  +                        //but before we return doing nothing, 
  +                        //see if we should send
  +                        //an updated last access message so that
  +                        //sessions across cluster dont expire
  +                        long interval = session.getMaxInactiveInterval();
  +                        long lastaccdist = System.currentTimeMillis() - 
  +                            session.getLastAccessWasDistributed();
  +                        System.out.println("\nFH\ninterval="+interval+" lastaccdist="+lastaccdist);    
  +                        if ( ((interval*1000) / lastaccdist)< 3 ) {
  +                            SessionMessage accmsg = new SessionMessage(name,
  +                                SessionMessage.EVT_SESSION_ACCESSED,
  +                                null,
  +                                sessionId);
  +                            session.setLastAccessWasDistributed(System.currentTimeMillis());
  +                            return accmsg;
  +                        }
  +                        return null;
  +                    }
  +                        
  +                    session.setIsDirty(false);
  +                    if (getDebug() > 5) {
  +                        try {
  +                            log.debug("Sending session to cluster=" + session);
  +                        }
  +                        catch (Exception ignore) {}
  +                    }
  +                    SessionMessage msg = new SessionMessage(name,
  +                        SessionMessage.EVT_SESSION_CREATED,
  +                        writeSession(session),
  +                        session.getId());
  +                    return msg;
  +                } //end if
  +            }//end if
           }
           catch (Exception x )
           {
  @@ -518,6 +567,22 @@
                       session.setValid(true);
                       session.access();
                       if ( getDebug()  > 5 ) log("Received replicated session="+session);
  +                    break;
  +                }
  +                case SessionMessage.EVT_SESSION_EXPIRED_WNOTIFY:
  +                case SessionMessage.EVT_SESSION_EXPIRED_WONOTIFY: {
  +                    Session session = findSession(msg.getSessionID());
  +                    if ( session != null ) {
  +                        session.expire();
  +                        this.remove(session);
  +                    }//end if
  +                    break;
  +                }
  +                case SessionMessage.EVT_SESSION_ACCESSED :{
  +                    Session session = findSession(msg.getSessionID());
  +                    if ( session != null ) {
  +                        session.access();
  +                    }
                       break;
                   }
                   default:  {
  
  
  
  1.5       +27 -10    jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationValve.java
  
  Index: ReplicationValve.java
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationValve.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- ReplicationValve.java	24 Apr 2003 04:24:01 -0000	1.4
  +++ ReplicationValve.java	9 Sep 2003 22:21:19 -0000	1.5
  @@ -186,6 +186,27 @@
               HttpRequest hrequest = (HttpRequest) request;
               HttpServletRequest hreq = (HttpServletRequest) hrequest.getRequest();
               HttpSession session = hreq.getSession(false);
  +            SimpleTcpReplicationManager manager = (SimpleTcpReplicationManager)request.getContext().getManager();
  +            SimpleTcpCluster cluster = (SimpleTcpCluster)getContainer().getCluster();
  +            if ( cluster == null ) {
  +                log("No cluster configured for this request.",2);
  +                return;
  +            }
  +            //first check for session invalidations
  +            String[] invalidIds=manager.getInvalidatedSessions();
  +            if ( invalidIds.length > 0 ) {
  +                for ( int i=0;i<invalidIds.length; i++ ) {
  +                    try {
  +                        SessionMessage imsg = manager.requestCompleted(
  +                            invalidIds[i]);
  +                        if (imsg != null)
  +                            cluster.send(imsg);
  +                    }catch ( Exception x ) {
  +                        log("Unable to send session invalid message over cluster.",x,2);
  +                    }
  +                }
  +            }
  +            
               String id = null;
               if ( session != null )
                   id = session.getId();
  @@ -199,11 +220,7 @@
                    (!(request.getContext().getManager() instanceof SimpleTcpReplicationManager)))
                   return;
   
  -            SimpleTcpCluster cluster = (SimpleTcpCluster)getContainer().getCluster();
  -            if ( cluster == null ) {
  -                log("No cluster configured for this request.",2);
  -                return;
  -            }
  +            
   
               String uri = hrequest.getDecodedRequestURI();
               boolean filterfound = false;
  @@ -217,7 +234,7 @@
                   return;
   
               if ( debug > 4 ) log("Invoking replication request on "+uri,4);
  -            SimpleTcpReplicationManager manager = (SimpleTcpReplicationManager)request.getContext().getManager();
  +            
               SessionMessage msg = manager.requestCompleted(id);
               if ( msg == null ) return;