You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by pe...@apache.org on 2005/03/25 23:10:25 UTC

cvs commit: jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp ReplicationListener.java

pero        2005/03/25 14:10:25

  Modified:    modules/cluster/src/share/org/apache/catalina/cluster/tcp
                        ReplicationListener.java
  Log:
  Change attribute name waitForAck to sendAck
  Add compress/uncompress message data transfer
  Update some documentation
  
  Revision  Changes    Path
  1.20      +74 -24    jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java
  
  Index: ReplicationListener.java
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java,v
  retrieving revision 1.19
  retrieving revision 1.20
  diff -u -r1.19 -r1.20
  --- ReplicationListener.java	15 Feb 2005 09:31:45 -0000	1.19
  +++ ReplicationListener.java	25 Mar 2005 22:10:25 -0000	1.20
  @@ -17,26 +17,44 @@
   package org.apache.catalina.cluster.tcp;
   
   
  +import java.net.InetSocketAddress;
  +import java.net.ServerSocket;
  +import java.nio.channels.SelectableChannel;
  +import java.nio.channels.SelectionKey;
  +import java.nio.channels.Selector;
   import java.nio.channels.ServerSocketChannel;
   import java.nio.channels.SocketChannel;
  -import java.nio.channels.Selector;
  -import java.nio.channels.SelectionKey;
  -import java.nio.channels.SelectableChannel;
  -
  -import java.net.ServerSocket;
  -import java.net.InetSocketAddress;
   import java.util.Iterator;
  -import org.apache.catalina.cluster.io.ListenCallback;
  -import org.apache.catalina.cluster.io.ObjectReader;
  +
   import org.apache.catalina.cluster.CatalinaCluster;
   import org.apache.catalina.cluster.ClusterReceiver;
  +import org.apache.catalina.cluster.tcp.Constants;
  +import org.apache.catalina.cluster.io.ListenCallback;
  +import org.apache.catalina.cluster.io.ObjectReader;
  +import org.apache.catalina.util.StringManager;
   /**
  - */
  +* FIXME i18n log messages
  +* FIXME jmx support
  +* @author Peter Rossbach
  +* @author Filip Hanik
  +* @version $Revision$ $Date$
  +*/
   public class ReplicationListener implements Runnable,ClusterReceiver
   {
  -
       private static org.apache.commons.logging.Log log =
           org.apache.commons.logging.LogFactory.getLog( ReplicationListener.class );
  +
  +    /**
  +     * The descriptive information about this implementation.
  +     */
  +    private static final String info = "ReplicationListener/1.1";
  +
  +    /**
  +     * The string manager for this package.
  +     */
  +    protected StringManager sm = StringManager.getManager(Constants.Package);
  +
  +    
       private ThreadPool pool = null;
       private boolean doListen = false;
       private ListenCallback callback;
  @@ -45,7 +63,12 @@
       private int tcpThreadCount;
       private long tcpSelectorTimeout;
       private int tcpListenPort;
  -    private boolean waitForAck;
  +    private boolean sendAck;
  +    /**
  +     * Compress message data bytes
  +     */
  +    private boolean compress = true ;
  +    
       private Selector selector = null;
       
       private Object interestOpsMutex = new Object();
  @@ -53,6 +76,24 @@
       public ReplicationListener() {
       }
   
  +    /**
  +     * @return Returns the compress.
  +     */
  +    public boolean isCompress() {
  +        return compress;
  +    }
  +    
  +    /**
  +     * @param compress The compress to set.
  +     */
  +    public void setCompress(boolean compressMessageData) {
  +        this.compress = compressMessageData;
  +    }
  +    
  +    /**
  +     * start cluster receiver
  +     * @see org.apache.catalina.cluster.ClusterReceiver#start()
  +     */
       public void start() {
           try {
               pool = new ThreadPool(tcpThreadCount, TcpReplicationThread.class, interestOpsMutex);
  @@ -89,6 +130,11 @@
           }
       }
   
  +    /**
  +     * get data from channel and store in byte array
  +     * send it to cluster
  +     * @throws Exception
  +     */
       public void listen ()
           throws Exception
       {
  @@ -134,11 +180,12 @@
                           ServerSocketChannel server =
                               (ServerSocketChannel) key.channel();
                           SocketChannel channel = server.accept();
  +                        Object attach  = attach = new ObjectReader(channel, selector,
  +                                    callback,isCompress()) ;
                           registerChannel(selector,
                                           channel,
                                           SelectionKey.OP_READ,
  -                                        new ObjectReader(channel, selector,
  -                            callback));
  +                                        attach);
                       }
                       // is there data to read on this channel?
                       if (key.isReadable()) {
  @@ -159,7 +206,7 @@
                   log.error("Unable to process request in ReplicationListener", x);
               }
   
  -        } //while
  +        }
           serverChannel.close();
           selector.close();
       }
  @@ -180,7 +227,10 @@
           callback = cluster;
       }
   
  -
  +    public CatalinaCluster getCatalinaCluster() {
  +        return (CatalinaCluster)callback ;
  +    }
  +    
       // ----------------------------------------------------------
   
       /**
  @@ -216,13 +266,13 @@
           if (worker == null) {
               // No threads available, do nothing, the selection
               // loop will keep calling this method until a
  -            // thread becomes available.  This design could
  -            // be improved.
  -            return;
  +            // thread becomes available.
  +            // FIXME: This design could be improved.
  +            if(log.isDebugEnabled())
  +                log.debug("No TcpReplicationThread available");
           } else {
               // invoking this wakes up the worker thread then returns
  -            worker.serviceChannel(key, waitForAck);
  -            return;
  +            worker.serviceChannel(key, sendAck);
           }
       }
       public String getTcpListenAddress() {
  @@ -249,11 +299,11 @@
       public void setTcpThreadCount(int tcpThreadCount) {
           this.tcpThreadCount = tcpThreadCount;
       }
  -    public boolean isWaitForAck() {
  -        return waitForAck;
  +    public boolean isSendAck() {
  +        return sendAck;
       }
  -    public void setWaitForAck(boolean waitForAck) {
  -        this.waitForAck = waitForAck;
  +    public void setSendAck(boolean sendAck) {
  +        this.sendAck = sendAck;
       }
       
       public String getHost() {
  
  
  

---------------------------------------------------------------------
To unsubscribe, e-mail: tomcat-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: tomcat-dev-help@jakarta.apache.org