You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2003/12/15 22:33:06 UTC

cvs commit: jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp ReplicationListener.java SimpleTcpCluster.java SocketSender.java TcpReplicationThread.java

fhanik      2003/12/15 13:33:06

  Modified:    modules/cluster/src/share/org/apache/catalina/cluster/session
                        SimpleTcpReplicationManager.java
               modules/cluster/src/share/org/apache/catalina/cluster/tcp
                        ReplicationListener.java SimpleTcpCluster.java
                        SocketSender.java TcpReplicationThread.java
  Log:
  Implemented true synchronous replication. The system now awaits for an ack from the other server before returning the
  requested thread during replication. This is bug 25493
  
  Revision  Changes    Path
  1.17      +4 -4      jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/SimpleTcpReplicationManager.java
  
  Index: SimpleTcpReplicationManager.java
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/SimpleTcpReplicationManager.java,v
  retrieving revision 1.16
  retrieving revision 1.17
  diff -u -r1.16 -r1.17
  --- SimpleTcpReplicationManager.java	15 Nov 2003 01:07:23 -0000	1.16
  +++ SimpleTcpReplicationManager.java	15 Dec 2003 21:33:06 -0000	1.17
  @@ -491,7 +491,7 @@
                       new SessionMessage(this.getName(),
                                          SessionMessage.EVT_GET_ALL_SESSIONS,
                                          null,
  -                                       null);
  +                                       "GET-ALL");
                   cluster.send(msg, mbr);
                   log.warn("Manager["+getName()+"], requesting session state from "+mbr+
                            ". This operation will timeout if no session state has been received within "+
  
  
  
  1.5       +9 -6      jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java
  
  Index: ReplicationListener.java
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- ReplicationListener.java	18 Apr 2003 02:51:24 -0000	1.4
  +++ ReplicationListener.java	15 Dec 2003 21:33:06 -0000	1.5
  @@ -91,13 +91,16 @@
       private java.net.InetAddress bind;
       private int port;
       private long timeout = 0;
  +    private boolean synchronous = false;
       public ReplicationListener(ListenCallback callback,
                                  int poolSize,
                                  java.net.InetAddress bind,
                                  int port,
  -                               long timeout)
  +                               long timeout,
  +                               boolean synchronous)
       {
           try {
  +            this.synchronous=synchronous;
               pool = new ThreadPool(poolSize, TcpReplicationThread.class);
           }catch ( Exception x ) {
               log.fatal("Unable to start thread pool for TCP listeners, session replication will fail! msg="+x.getMessage());
  @@ -155,7 +158,7 @@
                       SocketChannel channel = server.accept();
                       registerChannel (selector,
                                        channel,
  -                                     SelectionKey.OP_READ,
  +                                     SelectionKey.OP_READ | SelectionKey.OP_WRITE,
                                        new ObjectReader(channel,selector,callback));
                   }
                   // is there data to read on this channel?
  @@ -219,6 +222,6 @@
               return;
           }
           // invoking this wakes up the worker thread then returns
  -        worker.serviceChannel (key);
  +        worker.serviceChannel (key,synchronous);
       }
   }
  
  
  
  1.20      +6 -5      jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java
  
  Index: SimpleTcpCluster.java
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java,v
  retrieving revision 1.19
  retrieving revision 1.20
  diff -u -r1.19 -r1.20
  --- SimpleTcpCluster.java	16 Nov 2003 22:22:45 -0000	1.19
  +++ SimpleTcpCluster.java	15 Dec 2003 21:33:06 -0000	1.20
  @@ -480,7 +480,8 @@
                                           this.tcpThreadCount,
                                           this.tcpAddress,
                                           this.tcpPort,
  -                                        this.tcpSelectorTimeout);
  +                                        this.tcpSelectorTimeout,
  +                                        "synchronous".equals(this.replicationMode));
               mReplicationListener.setDaemon(true);
               mReplicationListener.start();
               mReplicationTransmitter = new ReplicationTransmitter(new IDataSender[0]);
  
  
  
  1.4       +21 -7     jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SocketSender.java
  
  Index: SocketSender.java
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SocketSender.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- SocketSender.java	15 Oct 2003 03:21:04 -0000	1.3
  +++ SocketSender.java	15 Dec 2003 21:33:06 -0000	1.4
  @@ -82,6 +82,7 @@
       private Socket sc = null;
       private boolean isSocketConnected = false;
       private boolean suspect;
  +    private long ackTimeout = 10000;
   
       public SocketSender(InetAddress host, int port)
       {
  @@ -102,6 +103,7 @@
       public void connect() throws java.io.IOException
       {
           sc = new Socket(getAddress(),getPort());
  +        sc.setSoTimeout((int)ackTimeout);
           isSocketConnected = true;
       }
   
  @@ -131,15 +133,27 @@
           try
           {
               sc.getOutputStream().write(data);
  +            sc.getOutputStream().flush();
  +            waitForAck(ackTimeout);
           }
           catch ( java.io.IOException x )
           {
               disconnect();
               connect();
               sc.getOutputStream().write(data);
  +            sc.getOutputStream().flush();
  +            waitForAck(ackTimeout);
           }
       }
  -    
  +
  +    private void waitForAck(long timeout)  throws java.io.IOException,
  +        java.net.SocketTimeoutException {
  +        int i = sc.getInputStream().read();
  +        while ( (i!=-1) && (i!=3) ) {
  +            i = sc.getInputStream().read();
  +        }
  +    }
  +
       public String toString() {
           StringBuffer buf = new StringBuffer("SocketSender[");
           buf.append(getAddress()).append(":").append(getPort()).append("]");
  @@ -148,14 +162,14 @@
       public boolean isSuspect() {
           return suspect;
       }
  -    
  +
       public boolean getSuspect() {
           return suspect;
       }
  -    
  +
       public void setSuspect(boolean suspect) {
           this.suspect = suspect;
       }
   
   
  -}
  \ No newline at end of file
  +}
  
  
  
  1.2       +27 -9     jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/TcpReplicationThread.java
  
  Index: TcpReplicationThread.java
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/TcpReplicationThread.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- TcpReplicationThread.java	18 Apr 2003 02:51:24 -0000	1.1
  +++ TcpReplicationThread.java	15 Dec 2003 21:33:06 -0000	1.2
  @@ -88,7 +88,7 @@
           org.apache.commons.logging.LogFactory.getLog( SimpleTcpCluster.class );
       private ByteBuffer buffer = ByteBuffer.allocate (1024);
       private SelectionKey key;
  -
  +    private boolean synchronous=false;
   
       TcpReplicationThread ()
       {
  @@ -112,8 +112,9 @@
               try {
                   drainChannel (key);
               } catch (Exception e) {
  -                log.info ("TCP Worker thread in cluster caught '"
  -                    + e + "' closing channel");
  +                log.error ("TCP Worker thread in cluster caught '"
  +                    + e + "' closing channel", e);
  +
                   // close channel and nudge selector
                   try {
                       key.channel().close();
  @@ -139,9 +140,10 @@
        * to ignore read-readiness for this channel while the
        * worker thread is servicing it.
        */
  -    synchronized void serviceChannel (SelectionKey key)
  +    synchronized void serviceChannel (SelectionKey key, boolean synchronous)
       {
           this.key = key;
  +        this.synchronous=synchronous;
           key.interestOps (key.interestOps() & (~SelectionKey.OP_READ));
           this.notify();		// awaken the thread
       }
  @@ -157,6 +159,7 @@
       void drainChannel (SelectionKey key)
           throws Exception
       {
  +        boolean packetReceived=false;
           SocketChannel channel = (SocketChannel) key.channel();
           int count;
           buffer.clear();			// make buffer empty
  @@ -164,11 +167,19 @@
           // loop while data available, channel is non-blocking
           while ((count = channel.read (buffer)) > 0) {
               buffer.flip();		// make buffer readable
  -            reader.append(buffer.array(),0,count);
  +            if (reader.append(buffer.array(),0,count)) {
  +                if (synchronous) {
  +                    sendAck(key,channel);
  +                } //end if
  +            }
               buffer.clear();		// make buffer empty
           }
           //check to see if any data is available
  -        reader.execute();
  +        if ( reader.execute() ) {
  +            if (synchronous) {
  +                sendAck(key,channel);
  +            }//end if
  +        }//end if
           if (count < 0) {
               // close channel on EOF, invalidates the key
               channel.close();
  @@ -178,5 +189,12 @@
           key.interestOps (key.interestOps() | SelectionKey.OP_READ);
           // cycle the selector so this key is active again
           key.selector().wakeup();
  +    }
  +
  +    private void sendAck(SelectionKey key, SocketChannel channel) throws java.io.IOException {
  +        //send a reply-acknowledgement
  +        java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(new byte[] {6,2,3});
  +        channel.write(buf);
  +        buf.clear();
       }
   }
  
  
  

---------------------------------------------------------------------
To unsubscribe, e-mail: tomcat-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: tomcat-dev-help@jakarta.apache.org