You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by pe...@apache.org on 2005/02/15 10:31:46 UTC

cvs commit: jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp DataSender.java AsyncSocketSender.java IDataSender.java PooledSocketSender.java ReplicationListener.java SimpleTcpCluster.java SocketSender.java TcpReplicationThread.java

pero        2005/02/15 01:31:46

  Modified:    modules/cluster/src/share/org/apache/catalina/cluster
                        ClusterReceiver.java ClusterSender.java
               modules/cluster/src/share/org/apache/catalina/cluster/tcp
                        AsyncSocketSender.java IDataSender.java
                        PooledSocketSender.java ReplicationListener.java
                        SimpleTcpCluster.java SocketSender.java
                        TcpReplicationThread.java
  Added:       modules/cluster/src/share/org/apache/catalina/cluster/tcp
                        DataSender.java
  Log:
  Refactor all IDataSender and factor out a base class
  Add some statistics attributes
  i18n support to senders
  Add KeepAlive and Ack Handling to AsyncSocketSender
  
  Revision  Changes    Path
  1.3       +9 -2      jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/ClusterReceiver.java
  
  Index: ClusterReceiver.java
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/ClusterReceiver.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- ClusterReceiver.java	27 Feb 2004 14:58:55 -0000	1.2
  +++ ClusterReceiver.java	15 Feb 2005 09:31:45 -0000	1.3
  @@ -17,7 +17,13 @@
   package org.apache.catalina.cluster;
   
   
  -
  +/**
  + * 
  + * @author Filip Hanik
  + * @author Peter Rossbach
  + * @version 1.1
  + *
  + */
   public interface ClusterReceiver
   {
   
  @@ -27,7 +33,8 @@
   
       public void setCatalinaCluster(CatalinaCluster cluster);
       
  -    public void setIsSenderSynchronized(boolean isSenderSynchronized);
  +    public boolean isWaitForAck();
  +    public void setWaitForAck(boolean isWaitForAck);
       
       public String getHost();
       
  
  
  
  1.4       +9 -2      jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/ClusterSender.java
  
  Index: ClusterSender.java
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/ClusterSender.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- ClusterSender.java	27 Dec 2004 09:30:36 -0000	1.3
  +++ ClusterSender.java	15 Feb 2005 09:31:45 -0000	1.4
  @@ -19,7 +19,13 @@
   import org.apache.catalina.cluster.tcp.SimpleTcpCluster;
   
   
  -
  +/**
  + * 
  + * @author Filip Hanik
  + * @author Peter Rossbach
  + * @version 1.1
  + *
  + */
   public interface ClusterSender
   {
   
  @@ -35,7 +41,8 @@
   
       public void sendMessage(String messageId, byte[] indata) throws java.io.IOException;
       
  -    public boolean getIsSenderSynchronized();
  +    public boolean isWaitForAck();
  +    public void setWaitForAck(boolean isWaitForAck);
   
       /**
        * @param cluster
  
  
  
  1.10      +157 -105  jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/AsyncSocketSender.java
  
  Index: AsyncSocketSender.java
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/AsyncSocketSender.java,v
  retrieving revision 1.9
  retrieving revision 1.10
  diff -u -r1.9 -r1.10
  --- AsyncSocketSender.java	27 Dec 2004 09:30:36 -0000	1.9
  +++ AsyncSocketSender.java	15 Feb 2005 09:31:45 -0000	1.10
  @@ -17,173 +17,215 @@
   package org.apache.catalina.cluster.tcp;
   
   import java.net.InetAddress;
  -import java.net.Socket;
  +
   import org.apache.catalina.cluster.util.SmartQueue;
   
   /**
  - * Send cluster messages from a Message queue with only one socket.
  + * Send cluster messages from a Message queue with only one socket. Ack and keep
  + * Alive Handling is supported.
  + * <ul>
  + * <li>With autoConnect=false at ReplicationTransmitter, you can disconnect the
  + * sender and all messages are queued. Only use this for small maintaince
  + * isuses!</li>
  + * <li>waitForAck=true, means that receiver ack the transfer</li>
  + * <li>after one minute idle time, or number of request (100) the connection is
  + * reconnected with next request. Change this for production use!</li>
  + * <li>default ackTimeout is 15 sec: this is very low for big all session replication messages after restart a node</li>
  + * <li>disable keepAlive: keepAliveTimeout="-1" and keepAliveMaxRequestCount="-1"</li>
  + * </ul>
    * 
    * @author Filip Hanik
    * @author Peter Rossbach
  - * @version 1.1
  + * @version 1.2
    */
  -public class AsyncSocketSender implements IDataSender {
  +public class AsyncSocketSender extends DataSender {
  +    
       private static int threadCounter = 1;
   
       private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
               .getLog(AsyncSocketSender.class);
   
  -    private InetAddress address;
  -
  -    private int port;
  -
  -    private Socket sc = null;
  +    /**
  +     * The descriptive information about this implementation.
  +     */
  +    private static final String info = "AsyncSocketSender/1.2";
   
  -    private boolean isSocketConnected = false;
  +    // ----------------------------------------------------- Instance Variables
   
  +    /**
  +     * Message Queue
  +     */
       private SmartQueue queue = new SmartQueue();
   
  -    private boolean suspect;
  -
  +    /**
  +     * Active thread to push messages asynchronous to the other replication node
  +     */
       private QueueThread queueThread = null;
   
  -    private long ackTimeout;
  -
  -    private long nrOfRequests = 0;
  -
  -    private long totalBytes = 0;
  -
  -    private synchronized void addStats(int length) {
  -        nrOfRequests++;
  -        totalBytes += length;
  -        if (log.isDebugEnabled() && (nrOfRequests % 100) == 0) {
  -            log.debug("Send stats from " + getAddress().getHostAddress() + ":" + getPort()
  -                    + "Nr of bytes sent=" + totalBytes + " over "
  -                    + nrOfRequests + " ==" + (totalBytes / nrOfRequests)
  -                    + " bytes/request");
  -        }
  -
  -    }
  +    /**
  +     * Count number of queue message
  +     */
  +    private long inQueueCounter = 0;
   
       /**
  -     * @return Returns the nrOfRequests.
  +     * Count all successfull push messages from queue
        */
  -    public long getNrOfRequests() {
  -        return nrOfRequests;
  -    }
  +    private long outQueueCounter = 0;
   
       /**
  -     * @return Returns the totalBytes.
  +     * Current number of bytes from all queued messages
        */
  -    public long getTotalBytes() {
  -        return totalBytes;
  -    }
  +    private long queuedNrOfBytes = 0;
   
  +    // ------------------------------------------------------------- Constructor
  +
  +    /**
  +     * start background thread to push incomming cluster messages to replication
  +     * node
  +     * 
  +     * @param host replication node tcp address
  +     * @param port replication node tcp port
  +     */
       public AsyncSocketSender(InetAddress host, int port) {
  -        this.address = host;
  -        this.port = port;
  +        super(host, port);
           checkThread();
  -        if (log.isInfoEnabled())
  -            log.info("Started async sender thread for TCP replication.");
  +        long a = Long.MAX_VALUE;
       }
   
  -    public InetAddress getAddress() {
  -        return address;
  -    }
  +    // ------------------------------------------------------------- Properties
   
  -    public int getPort() {
  -        return port;
  -    }
  +    /**
  +     * Return descriptive information about this implementation and the
  +     * corresponding version number, in the format
  +     * <code>&lt;description&gt;/&lt;version&gt;</code>.
  +     */
  +    public String getInfo() {
   
  -    public void connect() throws java.io.IOException {
  -        sc = new Socket(getAddress(), getPort());
  -        isSocketConnected = true;
  -        checkThread();
  +        return (info);
   
       }
   
  -    protected void checkThread() {
  -        if (queueThread == null) {
  -            queueThread = new QueueThread(this);
  -            queueThread.setDaemon(true);
  -            queueThread.start();
  -        }
  +    /**
  +     * @return Returns the inQueueCounter.
  +     */
  +    public long getInQueueCounter() {
  +        return inQueueCounter;
       }
   
  -    public void disconnect() {
  -        try {
  -            sc.close();
  -        } catch (Exception x) {
  -        }
  -        isSocketConnected = false;
  -        if (queueThread != null) {
  -            queueThread.stopRunning();
  -            queueThread = null;
  -        }
  +    /**
  +     * @return Returns the outQueueCounter.
  +     */
  +    public long getOutQueueCounter() {
  +        return outQueueCounter;
  +    }
   
  +    /**
  +     * @return Returns the queueSize.
  +     */
  +    public int getQueueSize() {
  +        return queue.size();
       }
   
  -    public boolean isConnected() {
  -        return isSocketConnected;
  +    /**
  +     * @return Returns the queuedNrOfBytes.
  +     */
  +    public long getQueuedNrOfBytes() {
  +        return queuedNrOfBytes;
       }
   
  -    public int getQueueSize() {
  -        return queue.size();
  +    // --------------------------------------------------------- Public Methods
  +
  +    /*
  +     * Connect to socket and start background thread to ppush queued messages
  +     * 
  +     * @see org.apache.catalina.cluster.tcp.IDataSender#connect()
  +     */
  +    public void connect() throws java.io.IOException {
  +        super.connect();
  +        checkThread();
       }
   
       /**
  -     * Blocking send
  +     * Disconnect socket ad stop queue thread
        * 
  -     * @param data
  -     * @throws java.io.IOException
  +     * @see org.apache.catalina.cluster.tcp.IDataSender#disconnect()
        */
  -    private synchronized void sendMessage(byte[] data)
  -            throws java.io.IOException {
  -        if (!isConnected())
  -            connect();
  -        try {
  -            sc.getOutputStream().write(data);
  -            sc.getOutputStream().flush();
  -        } catch (java.io.IOException x) {
  -            disconnect();
  -            connect();
  -            sc.getOutputStream().write(data);
  -            sc.getOutputStream().flush();
  -        }
  -        addStats(data.length);
  +    public void disconnect() {
  +        stopThread();
  +        super.disconnect();
       }
   
  -    public synchronized void sendMessage(String sessionId, byte[] data)
  +    /*
  +     * Send message to queue for later sending
  +     * 
  +     * @see org.apache.catalina.cluster.tcp.IDataSender#sendMessage(java.lang.String,
  +     *      byte[])
  +     */
  +    public synchronized void sendMessage(String messageid, byte[] data)
               throws java.io.IOException {
  -        SmartQueue.SmartEntry entry = new SmartQueue.SmartEntry(sessionId, data);
  +        SmartQueue.SmartEntry entry = new SmartQueue.SmartEntry(messageid, data);
           queue.add(entry);
  +        inQueueCounter++;
  +        queuedNrOfBytes += data.length;
  +        if (log.isTraceEnabled())
  +            log.trace(sm.getString("AsyncSocketSender.queue.message",
  +                    getAddress(), new Integer(getPort()), messageid, new Long(
  +                            data.length)));
  +    }
  +
  +    /*
  +     * Reset sender statistics
  +     */
  +    public synchronized void resetStatistics() {
  +        super.resetStatistics();
  +        inQueueCounter = queue.size();
  +        outQueueCounter = 0;
  +
       }
   
  +    /**
  +     * Name of this SockerSender
  +     */
       public String toString() {
           StringBuffer buf = new StringBuffer("AsyncSocketSender[");
           buf.append(getAddress()).append(":").append(getPort()).append("]");
           return buf.toString();
       }
   
  -    public boolean isSuspect() {
  -        return suspect;
  -    }
  +    // --------------------------------------------------------- Public Methods
   
  -    public boolean getSuspect() {
  -        return suspect;
  +    /**
  +     * Start Queue thread as daemon
  +     */
  +    protected void checkThread() {
  +        if (queueThread == null) {
  +            if (log.isInfoEnabled())
  +                log.info(sm.getString("AsyncSocketSender.create.thread",
  +                        getAddress(), new Integer(getPort())));
  +            queueThread = new QueueThread(this);
  +            queueThread.setDaemon(true);
  +            queueThread.start();
  +        }
       }
   
  -    public void setSuspect(boolean suspect) {
  -        this.suspect = suspect;
  +    /**
  +     * stop queue worker thread
  +     */
  +    protected void stopThread() {
  +        if (queueThread != null) {
  +            queueThread.stopRunning();
  +            queueThread = null;
  +        }
       }
   
  -    public long getAckTimeout() {
  -        return ackTimeout;
  +    /*
  +     * Reduce queued message date size counter
  +     */
  +    protected void reduceQueuedCounter(int size) {
  +        queuedNrOfBytes -= size;
       }
   
  -    public void setAckTimeout(long ackTimeout) {
  -        this.ackTimeout = ackTimeout;
  -    }
  +    // -------------------------------------------------------- Inner Class
   
       private class QueueThread extends Thread {
           AsyncSocketSender sender;
  @@ -199,16 +241,26 @@
               keepRunning = false;
           }
   
  +        /**
  +         * Get one queued message and push it to the replication node
  +         * 
  +         * @see DataSender#pushMessage(String, byte[])
  +         */
           public void run() {
               while (keepRunning) {
                   SmartQueue.SmartEntry entry = sender.queue.remove(5000);
                   if (entry != null) {
  +                    int messagesize = 0;
                       try {
                           byte[] data = (byte[]) entry.getValue();
  -                        sender.sendMessage(data);
  +                        messagesize = data.length;
  +                        sender.pushMessage((String) entry.getKey(), data);
  +                        outQueueCounter++;
                       } catch (Exception x) {
  -                        log.warn("Unable to asynchronously send session w/ id="
  -                                + entry.getKey() + " message will be ignored.");
  +                        log.warn(sm.getString("AsyncSocketSender.send.error",
  +                                entry.getKey()));
  +                    } finally {
  +                        reduceQueuedCounter(messagesize);
                       }
                   }
               }
  
  
  
  1.6       +4 -0      jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/IDataSender.java
  
  Index: IDataSender.java
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/IDataSender.java,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -r1.5 -r1.6
  --- IDataSender.java	29 Sep 2004 18:23:55 -0000	1.5
  +++ IDataSender.java	15 Feb 2005 09:31:45 -0000	1.6
  @@ -36,4 +36,8 @@
       public void setSuspect(boolean suspect);
       public boolean getSuspect();
       public void setAckTimeout(long timeout);
  +    public long getAckTimeout();
  +    public boolean isWaitForAck();
  +    public void setWaitForAck(boolean isWaitForAck);
  +
   }
  
  
  
  1.9       +117 -133  jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/PooledSocketSender.java
  
  Index: PooledSocketSender.java
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/PooledSocketSender.java,v
  retrieving revision 1.8
  retrieving revision 1.9
  diff -u -r1.8 -r1.9
  --- PooledSocketSender.java	27 Dec 2004 09:30:36 -0000	1.8
  +++ PooledSocketSender.java	15 Feb 2005 09:31:45 -0000	1.9
  @@ -15,8 +15,9 @@
    */
   
   package org.apache.catalina.cluster.tcp;
  -import java.net.InetAddress ;
  -import java.net.Socket;
  +
  +import java.io.IOException;
  +import java.net.InetAddress;
   import java.util.LinkedList;
   
   /**
  @@ -24,129 +25,99 @@
    * 
    * @author Filip Hanik
    * @author Peter Rossbach
  - * @version 1.1
  + * @version 1.2
    */
   
  +public class PooledSocketSender extends DataSender {
  +
  +    private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
  +            .getLog(org.apache.catalina.cluster.tcp.PooledSocketSender.class);
   
  -public class PooledSocketSender implements IDataSender
  -{
  +    /**
  +     * The descriptive information about this implementation.
  +     */
  +    private static final String info = "PooledSocketSender/1.2";
   
  -    private static org.apache.commons.logging.Log log =
  -        org.apache.commons.logging.LogFactory.getLog( org.apache.catalina.cluster.CatalinaCluster.class );
  +    // ----------------------------------------------------- Instance Variables
   
  -    private InetAddress address;
  -    private int port;
  -    private Socket sc = null;
  -    private boolean isSocketConnected = true;
  -    private boolean suspect;
  -    private long ackTimeout = 15*1000;  //15 seconds socket read timeout (for acknowledgement)
  -    private long keepAliveTimeout = 60*1000; //keep socket open for no more than one min
  -    private int keepAliveMaxRequestCount = 100; //max 100 requests before reconnecting
  -    private long keepAliveConnectTime = 0;
  -    private int keepAliveCount = 0;
       private int maxPoolSocketLimit = 25;
   
       private SenderQueue senderQueue = null;
  -    private long nrOfRequests = 0;
   
  -    private long totalBytes = 0;
  -
  -    public PooledSocketSender(InetAddress host, int port)
  -    {
  -        this.address = host;
  -        this.port = port;
  -        senderQueue = new SenderQueue(this,maxPoolSocketLimit);
  -    }
  -
  -    private synchronized void addStats(int length) {
  -        nrOfRequests++;
  -        totalBytes += length;
  -        if (log.isDebugEnabled() && (nrOfRequests % 100) == 0) {
  -            log.debug("Send stats from " + getAddress().getHostAddress() + ":" + getPort()
  -                    + "Nr of bytes sent=" + totalBytes + " over "
  -                    + nrOfRequests + " ==" + (totalBytes / nrOfRequests)
  -                    + " bytes/request");
  -        }
  +    //  ----------------------------------------------------- Constructor
   
  +    public PooledSocketSender(InetAddress host, int port) {
  +        super(host, port);
  +        senderQueue = new SenderQueue(this, maxPoolSocketLimit);
       }
   
  -    /**
  -     * @return Returns the nrOfRequests.
  -     */
  -    public long getNrOfRequests() {
  -        return nrOfRequests;
  -    }
  +    //  ----------------------------------------------------- Public Properties
   
       /**
  -     * @return Returns the totalBytes.
  +     * Return descriptive information about this implementation and the
  +     * corresponding version number, in the format
  +     * <code>&lt;description&gt;/&lt;version&gt;</code>.
        */
  -    public long getTotalBytes() {
  -        return totalBytes;
  -    }
  +    public String getInfo() {
   
  +        return (info);
   
  -    public InetAddress getAddress()
  -    {
  -        return address;
  -    }
  -
  -    public int getPort()
  -    {
  -        return port;
       }
   
  -    public void connect() throws java.io.IOException
  -    {
  -        //do nothing, happens in the socket sender itself
  -        senderQueue.open();
  -        isSocketConnected = true;
  +    public void setMaxPoolSocketLimit(int limit) {
  +        maxPoolSocketLimit = limit;
  +        senderQueue.setLimit(limit);
       }
   
  -    public void disconnect()
  -    {
  -        senderQueue.close();
  -        isSocketConnected = false;
  +    public int getMaxPoolSocketLimit() {
  +        return maxPoolSocketLimit;
       }
   
  -    public boolean isConnected()
  -    {
  -        return isSocketConnected;
  +    public int getInPoolSize() {
  +        return senderQueue.getInPoolSize();
       }
   
  -    public void setAckTimeout(long timeout) {
  -        this.ackTimeout = timeout;
  +    public int getInUsePoolSize() {
  +        return senderQueue.getInUsePoolSize();
       }
   
  -    public long getAckTimeout() {
  -        return ackTimeout;
  -    }
  +    //  ----------------------------------------------------- Public Methode
   
  -    public void setMaxPoolSocketLimit(int limit) {
  -        maxPoolSocketLimit = limit;
  +    public void connect() throws java.io.IOException {
  +        //do nothing, happens in the socket sender itself
  +        senderQueue.open();
  +        setSocketConnected(true);
  +        connectCounter++;
       }
   
  -    public int getMaxPoolSocketLimit() {
  -        return maxPoolSocketLimit;
  +    public void disconnect() {
  +        senderQueue.close();
  +        setSocketConnected(false);
  +        disconnectCounter++;
       }
   
  -
       /**
  -     * Blocking send
  -     * @param data
  +     * send Message and use a pool of SocketSenders
  +     * 
  +     * @param messageId Message unique identifier
  +     * @param data Message data
        * @throws java.io.IOException
        */
  -    public void sendMessage(String sessionId, byte[] data) throws java.io.IOException
  -    {
  +    public void sendMessage(String messageId, byte[] data) throws IOException {
           //get a socket sender from the pool
           SocketSender sender = senderQueue.getSender(0);
  -        if ( sender == null ) {
  -            log.warn("No socket sender available for client="+this.getAddress()+":"+this.getPort()+" did it disappear?");
  +        if (sender == null) {
  +            log.warn(sm.getString("PoolSocketSender.noMoreSender", this
  +                    .getAddress(), new Integer(this.getPort())));
               return;
  -        }//end if
  +        }
           //send the message
  -        sender.sendMessage(sessionId,data);
  -        //return the connection to the pool
  -        senderQueue.returnSender(sender);
  +        try {
  +            sender.sendMessage(messageId, data);
  +        } finally {
  +            //return the connection to the pool
  +            senderQueue.returnSender(sender);
  +        }
           addStats(data.length);
       }
   
  @@ -156,46 +127,19 @@
           return buf.toString();
       }
   
  -    public boolean getSuspect() {
  -        return suspect;
  -    }
  -
  -    public void setSuspect(boolean suspect) {
  -        this.suspect = suspect;
  -    }
  -
  -    public long getKeepAliveTimeout() {
  -        return keepAliveTimeout;
  -    }
  -    public void setKeepAliveTimeout(long keepAliveTimeout) {
  -        this.keepAliveTimeout = keepAliveTimeout;
  -    }
  -    public int getKeepAliveMaxRequestCount() {
  -        return keepAliveMaxRequestCount;
  -    }
  -    public void setKeepAliveMaxRequestCount(int keepAliveMaxRequestCount) {
  -        this.keepAliveMaxRequestCount = keepAliveMaxRequestCount;
  -    }
  -
  -    /**
  -     * @return Returns the keepAliveConnectTime.
  -     */
  -    public long getKeepAliveConnectTime() {
  -        return keepAliveConnectTime;
  -    }
  -    /**
  -     * @return Returns the keepAliveCount.
  -     */
  -    public int getKeepAliveCount() {
  -        return keepAliveCount;
  -    }
  +    //  ----------------------------------------------------- Inner Class
   
       private class SenderQueue {
           private int limit = 25;
  +
           PooledSocketSender parent = null;
  +
           private LinkedList queue = new LinkedList();
  +
           private LinkedList inuse = new LinkedList();
  +
           private Object mutex = new Object();
  +
           private boolean isOpen = true;
   
           public SenderQueue(PooledSocketSender parent, int limit) {
  @@ -203,30 +147,67 @@
               this.parent = parent;
           }
   
  +        /**
  +         * @return Returns the limit.
  +         */
  +        public int getLimit() {
  +            return limit;
  +        }
  +        /**
  +         * @param limit The limit to set.
  +         */
  +        public void setLimit(int limit) {
  +            this.limit = limit;
  +        }
  +        /**
  +         * @return
  +         */
  +        public int getInUsePoolSize() {
  +            return inuse.size();
  +        }
  +
  +        /**
  +         * @return
  +         */
  +        public int getInPoolSize() {
  +            return queue.size();
  +        }
  +
           public SocketSender getSender(long timeout) {
               SocketSender sender = null;
               long start = System.currentTimeMillis();
               long delta = 0;
               do {
                   synchronized (mutex) {
  -                    if ( !isOpen ) throw new IllegalStateException("Socket pool is closed.");
  -                    if ( queue.size() > 0 ) {
  +                    if (!isOpen)
  +                        throw new IllegalStateException(
  +                                "Socket pool is closed.");
  +                    if (queue.size() > 0) {
                           sender = (SocketSender) queue.removeFirst();
  -                    } else if ( inuse.size() < limit ) {
  +                    } else if (inuse.size() < limit) {
                           sender = getNewSocketSender();
                       } else {
                           try {
                               mutex.wait(timeout);
  -                        }catch ( Exception x ) {
  -                            PooledSocketSender.log.warn("PoolSocketSender.senderQueue.getSender failed",x);
  +                        } catch (Exception x) {
  +                            PooledSocketSender.log
  +                                    .warn(
  +                                            sm
  +                                                    .getString(
  +                                                            "PoolSocketSender.senderQueue.sender.failed",
  +                                                            parent.getAddress(),
  +                                                            new Integer(parent
  +                                                                    .getPort())),
  +                                            x);
                           }//catch
                       }//end if
  -                    if ( sender != null ) {
  +                    if (sender != null) {
                           inuse.add(sender);
                       }
                   }//synchronized
                   delta = System.currentTimeMillis() - start;
  -            } while ( (isOpen) && (sender == null) && (timeout==0?true:(delta<timeout)) );
  +            } while ((isOpen) && (sender == null)
  +                    && (timeout == 0 ? true : (delta < timeout)));
               //to do
               return sender;
           }
  @@ -242,21 +223,24 @@
   
           private SocketSender getNewSocketSender() {
               //new SocketSender(
  -            SocketSender sender = new SocketSender(parent.getAddress(),parent.getPort());
  -            sender.setKeepAliveMaxRequestCount(parent.getKeepAliveMaxRequestCount());
  +            SocketSender sender = new SocketSender(parent.getAddress(), parent
  +                    .getPort());
  +            sender.setKeepAliveMaxRequestCount(parent
  +                    .getKeepAliveMaxRequestCount());
               sender.setKeepAliveTimeout(parent.getKeepAliveTimeout());
               sender.setAckTimeout(parent.getAckTimeout());
  +            sender.setWaitForAck(parent.isWaitForAck());
               return sender;
   
           }
   
           public void close() {
               synchronized (mutex) {
  -                for ( int i=0; i<queue.size(); i++ ) {
  -                    SocketSender sender = (SocketSender)queue.get(i);
  +                for (int i = 0; i < queue.size(); i++) {
  +                    SocketSender sender = (SocketSender) queue.get(i);
                       sender.disconnect();
                   }//for
  -                for ( int i=0; i<inuse.size(); i++ ) {
  +                for (int i = 0; i < inuse.size(); i++) {
                       SocketSender sender = (SocketSender) inuse.get(i);
                       sender.disconnect();
                   }//for
  @@ -266,7 +250,7 @@
                   mutex.notifyAll();
               }
           }
  -        
  +
           public void open() {
               synchronized (mutex) {
                   isOpen = true;
  @@ -274,4 +258,4 @@
               }
           }
       }
  -}
  +}
  \ No newline at end of file
  
  
  
  1.19      +6 -6      jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java
  
  Index: ReplicationListener.java
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java,v
  retrieving revision 1.18
  retrieving revision 1.19
  diff -u -r1.18 -r1.19
  --- ReplicationListener.java	27 Dec 2004 09:30:36 -0000	1.18
  +++ ReplicationListener.java	15 Feb 2005 09:31:45 -0000	1.19
  @@ -45,7 +45,7 @@
       private int tcpThreadCount;
       private long tcpSelectorTimeout;
       private int tcpListenPort;
  -    private boolean isSenderSynchronized;
  +    private boolean waitForAck;
       private Selector selector = null;
       
       private Object interestOpsMutex = new Object();
  @@ -221,7 +221,7 @@
               return;
           } else {
               // invoking this wakes up the worker thread then returns
  -            worker.serviceChannel(key, isSenderSynchronized);
  +            worker.serviceChannel(key, waitForAck);
               return;
           }
       }
  @@ -249,11 +249,11 @@
       public void setTcpThreadCount(int tcpThreadCount) {
           this.tcpThreadCount = tcpThreadCount;
       }
  -    public boolean getIsSenderSynchronized() {
  -        return isSenderSynchronized;
  +    public boolean isWaitForAck() {
  +        return waitForAck;
       }
  -    public void setIsSenderSynchronized(boolean isSenderSynchronized) {
  -        this.isSenderSynchronized = isSenderSynchronized;
  +    public void setWaitForAck(boolean waitForAck) {
  +        this.waitForAck = waitForAck;
       }
       
       public String getHost() {
  
  
  
  1.58      +2 -3      jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java
  
  Index: SimpleTcpCluster.java
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java,v
  retrieving revision 1.57
  retrieving revision 1.58
  diff -u -r1.57 -r1.58
  --- SimpleTcpCluster.java	27 Dec 2004 09:30:36 -0000	1.57
  +++ SimpleTcpCluster.java	15 Feb 2005 09:31:45 -0000	1.58
  @@ -389,8 +389,7 @@
   
               }
               registerMBeans();
  -            clusterReceiver.setIsSenderSynchronized(clusterSender
  -                    .getIsSenderSynchronized());
  +            clusterReceiver.setWaitForAck(clusterSender.isWaitForAck());
               clusterReceiver.setCatalinaCluster(this);
               clusterReceiver.start();
               clusterSender.setCatalinaCluster(this);
  
  
  
  1.15      +18 -187   jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SocketSender.java
  
  Index: SocketSender.java
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SocketSender.java,v
  retrieving revision 1.14
  retrieving revision 1.15
  diff -u -r1.14 -r1.15
  --- SocketSender.java	27 Dec 2004 09:30:36 -0000	1.14
  +++ SocketSender.java	15 Feb 2005 09:31:45 -0000	1.15
  @@ -15,176 +15,42 @@
    */
   
   package org.apache.catalina.cluster.tcp;
  -import java.net.InetAddress ;
  -import java.net.Socket;
  +
  +import java.net.InetAddress;
   
   /**
    * Send cluster messages sync to request with only one socket.
    * 
    * @author Filip Hanik
    * @author Peter Rossbach
  - * @version 1.1
  + * @version 1.2
    */
   
  -public class SocketSender implements IDataSender
  -{
  -
  -    private static org.apache.commons.logging.Log log =
  -        org.apache.commons.logging.LogFactory.getLog( SocketSender.class );
  -
  -    private InetAddress address;
  -    private int port;
  -    private Socket sc = null;
  -    private boolean isSocketConnected = false;
  -    /**
  -     * Flag socket as suspect
  -     */
  -    private boolean suspect;
  -    /**
  -     * 15 seconds socket read timeout (for acknowledgement)
  -     */
  -    private long ackTimeout = 15*1000; 
  -    /**
  -     * keep socket open for no more than one min 
  -     */
  -    private long keepAliveTimeout = 60*1000; 
  -    
  -    /**
  -     *   max 100 requests before reconnecting
  -     */
  -    private int keepAliveMaxRequestCount = 100; 
  -    
  -    private long keepAliveConnectTime = 0;
  -    private int keepAliveCount = 0;
  -
  -    private long nrOfRequests = 0;
  -
  -    private long totalBytes = 0;
  -
  -    private synchronized void addStats(int length) {
  -        nrOfRequests++;
  -        totalBytes += length;
  -        if (log.isDebugEnabled() && (nrOfRequests % 100) == 0) {
  -            log.debug("Send stats from " + getAddress().getHostAddress() + ":" + getPort()
  -                    + "Nr of bytes sent=" + totalBytes + " over "
  -                    + nrOfRequests + " ==" + (totalBytes / nrOfRequests)
  -                    + " bytes/request");
  -        }
  -
  -    }
  -
  -    /**
  -     * get number of messages that send 
  -     * @return Returns the nrOfRequests.
  -     */
  -    public long getNrOfRequests() {
  -        return nrOfRequests;
  -    }
  +public class SocketSender extends DataSender {
  +    // ----------------------------------------------------- Instance Variables
   
       /**
  -     * get total num bytes send with this socket.
  -     * @return Returns the totalBytes.
  +     * The descriptive information about this implementation.
        */
  -    public long getTotalBytes() {
  -        return totalBytes;
  -    }
  -    
  -    public SocketSender(InetAddress host, int port)
  -    {
  -        this.address = host;
  -        this.port = port;
  -    }
  -
  -    public InetAddress getAddress()
  -    {
  -        return address;
  -    }
  -
  -    public int getPort()
  -    {
  -        return port;
  -    }
  -
  -    public void connect() throws java.io.IOException
  -    {
  -        sc = new Socket(getAddress(),getPort());
  -        sc.setSoTimeout((int)ackTimeout);
  -        isSocketConnected = true;
  -        this.keepAliveCount = 0;
  -        this.keepAliveConnectTime = System.currentTimeMillis();
  -    }
  -
  -    public void disconnect()
  -    {
  -        try
  -        {
  -            sc.close();
  -        }catch ( Exception x)
  -        {}
  -        isSocketConnected = false;
  -    }
  +    private static final String info = "SocketSender/1.2";
   
  -    public boolean isConnected()
  -    {
  -        return isSocketConnected;
  -    }
  -
  -    public void checkIfDisconnect() {
  -        long ctime = System.currentTimeMillis() - this.keepAliveConnectTime;
  -        if ( (ctime > this.keepAliveTimeout) ||
  -             (this.keepAliveCount >= this.keepAliveMaxRequestCount) ) {
  -            disconnect();
  -        }
  -    }
  +    // ------------------------------------------------------------- Constructor
   
  -    public void setAckTimeout(long timeout) {
  -        this.ackTimeout = timeout;
  +    public SocketSender(InetAddress host, int port) {
  +        super(host, port);
       }
   
  -    public long getAckTimeout() {
  -        return ackTimeout;
  -    }
  +    // ------------------------------------------------------------- Properties
   
       /**
  -     * send with only one socket at a time
  -     * @param sessionid unique message id
  -     * @param data data to send
  -     * @throws java.io.IOException
  +     * Return descriptive information about this implementation and the
  +     * corresponding version number, in the format
  +     * <code>&lt;description&gt;/&lt;version&gt;</code>.
        */
  -    public synchronized void sendMessage(String sessionId, byte[] data) throws java.io.IOException
  -    {
  -        checkIfDisconnect();
  -        if ( !isConnected() ) connect();
  -        try
  -        {
  -            sc.getOutputStream().write(data);
  -            sc.getOutputStream().flush();
  -            waitForAck(ackTimeout);
  -        }
  -        catch ( java.io.IOException x )
  -        {
  -            disconnect();
  -            connect();
  -            sc.getOutputStream().write(data);
  -            sc.getOutputStream().flush();
  -            waitForAck(ackTimeout);
  -        }
  -        this.keepAliveCount++;
  -        checkIfDisconnect();
  -        addStats(data.length);
  -    }
  +    public String getInfo() {
  +
  +        return (info);
   
  -    private void waitForAck(long timeout)  throws java.io.IOException {
  -        try {
  -            int i = sc.getInputStream().read();
  -            while ( (i != -1) && (i != 3)) {
  -                i = sc.getInputStream().read();
  -            }
  -        } catch (java.net.SocketTimeoutException x ) {
  -            log.warn("Wasn't able to read acknowledgement from server["+getAddress()+":"+getPort()+"] in "+this.ackTimeout+" ms."+
  -                     " Disconnecting socket, and trying again.");
  -            throw x;
  -        }
       }
   
       public String toString() {
  @@ -192,40 +58,5 @@
           buf.append(getAddress()).append(":").append(getPort()).append("]");
           return buf.toString();
       }
  -    public boolean isSuspect() {
  -        return suspect;
  -    }
  -
  -    public boolean getSuspect() {
  -        return suspect;
  -    }
   
  -    public void setSuspect(boolean suspect) {
  -        this.suspect = suspect;
  -    }
  -    public long getKeepAliveTimeout() {
  -        return keepAliveTimeout;
  -    }
  -    public void setKeepAliveTimeout(long keepAliveTimeout) {
  -        this.keepAliveTimeout = keepAliveTimeout;
  -    }
  -    public int getKeepAliveMaxRequestCount() {
  -        return keepAliveMaxRequestCount;
  -    }
  -    public void setKeepAliveMaxRequestCount(int keepAliveMaxRequestCount) {
  -        this.keepAliveMaxRequestCount = keepAliveMaxRequestCount;
  -    }
  -
  -    /**
  -     * @return Returns the keepAliveConnectTime.
  -     */
  -    public long getKeepAliveConnectTime() {
  -        return keepAliveConnectTime;
  -    }
  -    /**
  -     * @return Returns the keepAliveCount.
  -     */
  -    public int getKeepAliveCount() {
  -        return keepAliveCount;
  -    }
  -}
  +}
  \ No newline at end of file
  
  
  
  1.13      +5 -4      jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/TcpReplicationThread.java
  
  Index: TcpReplicationThread.java
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/TcpReplicationThread.java,v
  retrieving revision 1.12
  retrieving revision 1.13
  diff -u -r1.12 -r1.13
  --- TcpReplicationThread.java	13 Jul 2004 09:43:58 -0000	1.12
  +++ TcpReplicationThread.java	15 Feb 2005 09:31:45 -0000	1.13
  @@ -39,7 +39,7 @@
           org.apache.commons.logging.LogFactory.getLog( TcpReplicationThread.class );
       private ByteBuffer buffer = ByteBuffer.allocate (1024);
       private SelectionKey key;
  -    private boolean synchronous=false;
  +    private boolean waitForAck=true;
   
       TcpReplicationThread ()
       {
  @@ -91,10 +91,10 @@
        * to ignore read-readiness for this channel while the
        * worker thread is servicing it.
        */
  -    synchronized void serviceChannel (SelectionKey key, boolean synchronous)
  +    synchronized void serviceChannel (SelectionKey key, boolean waitForAck)
       {
           this.key = key;
  -        this.synchronous=synchronous;
  +        this.waitForAck=waitForAck;
           key.interestOps (key.interestOps() & (~SelectionKey.OP_READ));
           key.interestOps (key.interestOps() & (~SelectionKey.OP_WRITE));
           this.notify();		// awaken the thread
  @@ -125,11 +125,12 @@
           //check to see if any data is available
           int pkgcnt = reader.execute();
           while ( pkgcnt > 0 ) {
  -            if (synchronous) {
  +            if (waitForAck) {
                   sendAck(key,channel);
               } //end if
               pkgcnt--;
           }
  +        
           if (count < 0) {
               // close channel on EOF, invalidates the key
               channel.close();
  
  
  
  1.1                  jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/DataSender.java
  
  Index: DataSender.java
  ===================================================================
  /*
   * Copyright 1999,2005 The Apache Software Foundation.
   * 
   * Licensed under the Apache License, Version 2.0 (the "License"); you may not
   * use this file except in compliance with the License. You may obtain a copy of
   * the License at
   * 
   * http://www.apache.org/licenses/LICENSE-2.0
   * 
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   * License for the specific language governing permissions and limitations under
   * the License.
   */
  
  package org.apache.catalina.cluster.tcp;
  
  import java.io.IOException;
  import java.net.InetAddress;
  import java.net.Socket;
  import java.net.SocketException;
  
  import org.apache.catalina.util.StringManager;
  
  /**
   * Send cluster messages with only one socket. Ack and keep Alive Handling is
   * supported
   * 
   * @author Peter Rossbach
   * @author Filip Hanik
   * @version 1.2
   */
  public class DataSender implements IDataSender {
  
      private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
              .getLog(DataSender.class);
  
      /**
       * The string manager for this package.
       */
      protected static StringManager sm = StringManager
              .getManager(Constants.Package);
  
      // ----------------------------------------------------- Instance Variables
  
      /**
       * The descriptive information about this implementation.
       */
      private static final String info = "DataSender/1.2";
  
      private InetAddress address;
  
      private int port;
  
      private Socket sc = null;
  
      private boolean isSocketConnected = false;
  
      private boolean suspect;
  
      private long ackTimeout;
  
      protected long nrOfRequests = 0;
  
      protected long totalBytes = 0;
  
      protected long connectCounter = 0;
  
      protected long disconnectCounter = 0;
  
      protected long missingAckCounter = 0;
  
      protected long dataResendCounter = 0;
  
      /**
       * keep socket open for no more than one min
       */
      private long keepAliveTimeout = 60 * 1000;
  
      /**
       * max 100 requests before reconnecting
       */
      private int keepAliveMaxRequestCount = 100;
  
      /**
       * Last connect timestamp
       */
      private long keepAliveConnectTime = 0;
  
      /**
       * keepalive counter
       */
      private int keepAliveCount = 0;
  
      private boolean waitForAck = true;
  
      private int socketCloseCounter;
  
      private int socketOpenCounter;
  
      // ------------------------------------------------------------- Constructor
  
      public DataSender(InetAddress host, int port) {
          this.address = host;
          this.port = port;
          if (log.isInfoEnabled())
              log.info(sm.getString("IDataSender.create", address, new Integer(
                      port)));
      }
  
      // ------------------------------------------------------------- Properties
  
      /**
       * Return descriptive information about this implementation and the
       * corresponding version number, in the format
       * <code>&lt;description&gt;/&lt;version&gt;</code>.
       */
      public String getInfo() {
  
          return (info);
  
      }
  
      /**
       * @return Returns the nrOfRequests.
       */
      public long getNrOfRequests() {
          return nrOfRequests;
      }
  
      /**
       * @return Returns the totalBytes.
       */
      public long getTotalBytes() {
          return totalBytes;
      }
  
      /**
       * @return Returns the connectCounter.
       */
      public long getConnectCounter() {
          return connectCounter;
      }
  
      /**
       * @return Returns the disconnectCounter.
       */
      public long getDisconnectCounter() {
          return disconnectCounter;
      }
  
      /**
       * @return Returns the missingAckCounter.
       */
      public long getMissingAckCounter() {
          return missingAckCounter;
      }
  
      /**
       * @return Returns the socketOpenCounter.
       */
      public int getSocketOpenCounter() {
          return socketOpenCounter;
      }
      
      /**
       * @return Returns the socketCloseCounter.
       */
      public int getSocketCloseCounter() {
          return socketCloseCounter;
      }
  
      /**
       * @return Returns the dataResendCounter.
       */
      public long getDataResendCounter() {
          return dataResendCounter;
      }
  
      public InetAddress getAddress() {
          return address;
      }
  
      public int getPort() {
          return port;
      }
  
      public boolean isConnected() {
          return isSocketConnected;
      }
  
      /**
       * @param isSocketConnected
       *            The isSocketConnected to set.
       */
      protected void setSocketConnected(boolean isSocketConnected) {
          this.isSocketConnected = isSocketConnected;
      }
  
      public boolean isSuspect() {
          return suspect;
      }
  
      public boolean getSuspect() {
          return suspect;
      }
  
      public void setSuspect(boolean suspect) {
          this.suspect = suspect;
      }
  
      public long getAckTimeout() {
          return ackTimeout;
      }
  
      public void setAckTimeout(long ackTimeout) {
          this.ackTimeout = ackTimeout;
      }
  
      public long getKeepAliveTimeout() {
          return keepAliveTimeout;
      }
  
      public void setKeepAliveTimeout(long keepAliveTimeout) {
          this.keepAliveTimeout = keepAliveTimeout;
      }
  
      public int getKeepAliveMaxRequestCount() {
          return keepAliveMaxRequestCount;
      }
  
      public void setKeepAliveMaxRequestCount(int keepAliveMaxRequestCount) {
          this.keepAliveMaxRequestCount = keepAliveMaxRequestCount;
      }
  
      /**
       * @return Returns the keepAliveConnectTime.
       */
      public long getKeepAliveConnectTime() {
          return keepAliveConnectTime;
      }
  
      /**
       * @return Returns the keepAliveCount.
       */
      public int getKeepAliveCount() {
          return keepAliveCount;
      }
  
      /**
       * @return Returns the waitForAck.
       */
      public boolean isWaitForAck() {
          return waitForAck;
      }
  
      /**
       * @param waitForAck
       *            The waitForAck to set.
       */
      public void setWaitForAck(boolean waitForAck) {
          this.waitForAck = waitForAck;
      }
  
      // --------------------------------------------------------- Public Methods
  
      public void connect() throws java.io.IOException {
          connectCounter++;
          if (log.isDebugEnabled())
              log.debug(sm.getString("IDataSender.connect", address,
                      new Integer(port)));
          openSocket();
      }
  
   
      /**
       * close socket
       * 
       * @see org.apache.catalina.cluster.tcp.IDataSender#disconnect()
       * @see DataSender#closeSocket()
       */
      public void disconnect() {
          disconnectCounter++;
          if (log.isDebugEnabled())
              log.debug(sm.getString("IDataSender.disconnect", address,
                      new Integer(port)));
          closeSocket();
      }
  
      /**
       * Check, if time to close socket! Important for AsyncSocketSender that
       * replication thread is not fork again! <b>Only work when keepAliveTimeout
       * or keepAliveMaxRequestCount greater -1 </b>
       * @return true, is socket close
       * @see DataSender#closeSocket()
       */
      public boolean checkIfCloseSocket() {
          boolean isCloseSocket = true ;
          long ctime = System.currentTimeMillis() - this.keepAliveConnectTime;
          if ((keepAliveTimeout > -1 && ctime > this.keepAliveTimeout)
                  || (keepAliveMaxRequestCount > -1 && this.keepAliveCount >= this.keepAliveMaxRequestCount)) {
              closeSocket();
          } else
              isCloseSocket = false ;
          return isCloseSocket;
      }
  
      /*
       * Send message
       * 
       * @see org.apache.catalina.cluster.tcp.IDataSender#sendMessage(java.lang.String,
       *      byte[])
       */
      public synchronized void sendMessage(String messageid, byte[] data)
              throws java.io.IOException {
          pushMessage(messageid, data);
      }
  
      /*
       * Reset sender statistics
       */
      public synchronized void resetStatistics() {
          nrOfRequests = 0;
          totalBytes = 0;
          disconnectCounter = 0;
          connectCounter = isConnected() ? 1 : 0;
          missingAckCounter = 0;
          dataResendCounter = 0;
          socketOpenCounter =isConnected() ? 1 : 0;
          socketCloseCounter = 0;
      }
  
      /**
       * Name of this SockerSender
       */
      public String toString() {
          StringBuffer buf = new StringBuffer("DataSender[");
          buf.append(getAddress()).append(":").append(getPort()).append("]");
          return buf.toString();
      }
  
      // --------------------------------------------------------- Protected
      // Methods
  
      /**
       * @throws IOException
       * @throws SocketException
       */
      protected void openSocket() throws IOException, SocketException {
          socketOpenCounter++;
          if (log.isDebugEnabled())
              log.debug(sm.getString("IDataSender.openSocket", address, new Integer(
                      port)));
          sc = new Socket(getAddress(), getPort());
          if (isWaitForAck())
              sc.setSoTimeout((int) ackTimeout);
          isSocketConnected = true;
          this.keepAliveCount = 0;
          this.keepAliveConnectTime = System.currentTimeMillis();
      }
  
      /**
       * close socket
       * 
       * @see DataSender#disconnect()
       * @see DataSender#checkIfCloseSocket()
       */
      protected void closeSocket() {
          if(isSocketConnected) {
              socketCloseCounter++;
              if (log.isDebugEnabled())
                  log.debug(sm.getString("IDataSender.socketclose",
                          address, new Integer(port)));
              try {
                  sc.close();
              } catch (Exception x) {
              }
              isSocketConnected = false;
          }
      }
  
      /**
       * Add statistic for this socket instance
       * 
       * @param length
       */
      protected void addStats(int length) {
          nrOfRequests++;
          totalBytes += length;
          if (log.isDebugEnabled() && (nrOfRequests % 100) == 0) {
              log.debug(sm.getString("IDataSender.stats", new Object[] {
                      getAddress().getHostAddress(), new Integer(getPort()),
                      new Long(totalBytes), new Long(nrOfRequests),
                      new Long(totalBytes / nrOfRequests) }));
          }
      }
  
      /**
       * push messages with only one socket at a time
       * 
       * @param messageid
       *            unique message id
       * @param data
       *            data to send
       * @throws java.io.IOException
       */
      protected synchronized void pushMessage(String messageid, byte[] data)
              throws java.io.IOException {
          checkIfCloseSocket();
          if (!isConnected())
              openSocket();
          try {
              sc.getOutputStream().write(data);
              sc.getOutputStream().flush();
              if (isWaitForAck())
                  waitForAck(ackTimeout);
          } catch (java.io.IOException x) {
              // second try with fresh connection
              dataResendCounter++;
              if (log.isTraceEnabled())
                  log.trace(sm.getString("IDataSender.send.again", address,
                          new Integer(port)));
              closeSocket();
              openSocket();
              sc.getOutputStream().write(data);
              sc.getOutputStream().flush();
              if (isWaitForAck())
                  waitForAck(ackTimeout);
          }
          this.keepAliveCount++;
          checkIfCloseSocket();
          addStats(data.length);
          if (log.isTraceEnabled())
              log.trace(sm.getString("IDataSender.send.message", address,
                      new Integer(port), messageid, new Long(data.length)));
  
      }
  
      /**
       * Wait for Acknowledgement from other server
       * 
       * @param timeout
       * @throws java.io.IOException
       */
      protected void waitForAck(long timeout) throws java.io.IOException {
          try {
              int i = sc.getInputStream().read();
              while ((i != -1) && (i != 3)) {
                  i = sc.getInputStream().read();
              }
          } catch (java.net.SocketTimeoutException x) {
              missingAckCounter++;
              log.warn(sm.getString("IDataSender.missing.ack", getAddress(),
                      new Integer(getPort()), new Long(this.ackTimeout)));
              throw x;
          }
      }
  }
  
  

---------------------------------------------------------------------
To unsubscribe, e-mail: tomcat-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: tomcat-dev-help@jakarta.apache.org


Re: cvs commit: jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp DataSender.java AsyncSocketSender.java IDataSender.java PooledSocketSender.java ReplicationListener.java SimpleTcpCluster.java SocketSender.java TcpReplicatio

Posted by shubham <pt...@sancharnet.in>.
Dengerous virus is entering to your computer through this message do not
reply!
----- Original Message -----
From: <pe...@apache.org>
To: <ja...@apache.org>
Sent: Tuesday, February 15, 2005 3:01 PM
Subject: cvs commit:
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluste
r/tcp DataSender.java AsyncSocketSender.java IDataSender.java
PooledSocketSender.java ReplicationListener.java SimpleTcpCluster.java
SocketSender.java TcpReplicationThread.java




---------------------------------------------------------------------
To unsubscribe, e-mail: tomcat-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: tomcat-dev-help@jakarta.apache.org