You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2003/04/18 04:51:24 UTC

cvs commit: jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/util SmartQueue.java

fhanik      2003/04/17 19:51:24

  Modified:    modules/cluster/src/share/org/apache/catalina/cluster/session
                        SimpleTcpReplicationManager.java
               modules/cluster/src/share/org/apache/catalina/cluster/tcp
                        IDataSender.java ReplicationListener.java
                        ReplicationTransmitter.java SimpleTcpCluster.java
                        SocketSender.java ThreadPool.java WorkerThread.java
  Added:       modules/cluster/src/share/org/apache/catalina/cluster/tcp
                        AsyncSocketSender.java IDataSenderFactory.java
                        TcpReplicationThread.java
               modules/cluster/src/share/org/apache/catalina/cluster/util
                        SmartQueue.java
  Log:
  added in support for asynchronous replication
  
  Revision  Changes    Path
  1.8       +3 -15     jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/SimpleTcpReplicationManager.java
  
  Index: SimpleTcpReplicationManager.java
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/SimpleTcpReplicationManager.java,v
  retrieving revision 1.7
  retrieving revision 1.8
  diff -u -r1.7 -r1.8
  --- SimpleTcpReplicationManager.java	16 Apr 2003 18:49:22 -0000	1.7
  +++ SimpleTcpReplicationManager.java	18 Apr 2003 02:51:24 -0000	1.8
  @@ -554,18 +554,6 @@
           }
       }
   
  -    public IDataSender createDataSender(Member addr) {
  -        try  {
  -            Member mbr = addr;
  -            return new SocketSender(InetAddress.getByName(mbr.getHost()),mbr.getPort());
  -        } catch ( Exception x ){
  -            log("Unable to create a socket for replication.",x);
  -        }
  -        return null;
  -
  -    }
  -
  -
       public void messageDataReceived(SessionMessage msg) {
           try {
               messageReceived(msg, msg.getAddress()!=null?(Member)msg.getAddress():null);
  
  
  
  1.2       +4 -4      jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/IDataSender.java
  
  Index: IDataSender.java
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/IDataSender.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- IDataSender.java	19 Feb 2003 20:32:11 -0000	1.1
  +++ IDataSender.java	18 Apr 2003 02:51:24 -0000	1.2
  @@ -78,6 +78,6 @@
       public int getPort();
       public void connect() throws java.io.IOException;
       public void disconnect();
  -    public void sendMessage(byte[] data) throws java.io.IOException;
  +    public void sendMessage(String sessionId, byte[] data) throws java.io.IOException;
       public boolean isConnected();
   }
  
  
  
  1.4       +9 -5      jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java
  
  Index: ReplicationListener.java
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- ReplicationListener.java	3 Apr 2003 02:29:38 -0000	1.3
  +++ ReplicationListener.java	18 Apr 2003 02:51:24 -0000	1.4
  @@ -97,7 +97,11 @@
                                  int port,
                                  long timeout)
       {
  -        pool = new ThreadPool(poolSize);
  +        try {
  +            pool = new ThreadPool(poolSize, TcpReplicationThread.class);
  +        }catch ( Exception x ) {
  +            log.fatal("Unable to start thread pool for TCP listeners, session replication will fail! msg="+x.getMessage());
  +        }
           this.callback = callback;
           this.bind = bind;
           this.port = port;
  @@ -206,7 +210,7 @@
       protected void readDataFromSocket (SelectionKey key)
           throws Exception
       {
  -        WorkerThread worker = pool.getWorker();
  +        TcpReplicationThread worker = (TcpReplicationThread)pool.getWorker();
           if (worker == null) {
               // No threads available, do nothing, the selection
               // loop will keep calling this method until a
  
  
  
  1.5       +21 -21    jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java
  
  Index: ReplicationTransmitter.java
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- ReplicationTransmitter.java	26 Mar 2003 17:44:19 -0000	1.4
  +++ ReplicationTransmitter.java	18 Apr 2003 02:51:24 -0000	1.5
  @@ -63,14 +63,6 @@
   
   package org.apache.catalina.cluster.tcp;
   
  -/**
  - * <p>Title: </p>
  - * <p>Description: </p>
  - * <p>Copyright: Copyright (c) 2002</p>
  - * <p>Company: </p>
  - * @author not attributable
  - * @version 1.0
  - */
   import org.apache.catalina.cluster.io.XByteBuffer;
   
   
  @@ -131,18 +123,29 @@
           IDataSender[] result = new IDataSender[v.size()];
           return result;
       }
  +    
  +    protected void sendMessageData(String sessionId, byte[] data, IDataSender sender) throws java.io.IOException  {
  +        if ( sender == null ) throw new java.io.IOException("Sender not available. Make sure sender information is available to the ReplicationTransmitter.");
  +        try
  +        {
  +            if (!sender.isConnected())
  +                sender.connect();
  +            sender.sendMessage(sessionId,data);
  +        }catch ( Exception x)
  +        {
  +            log.warn("Unable to send replicated message, is server down?",x);
  +        }
   
  -    public void sendMessage(byte[] indata, java.net.InetAddress addr, int port) throws java.io.IOException
  +    }
  +    public void sendMessage(String sessionId, byte[] indata, java.net.InetAddress addr, int port) throws java.io.IOException
       {
           byte[] data = XByteBuffer.createDataPackage(indata);
           String key = addr.getHostAddress()+":"+port;
           IDataSender sender = (IDataSender)map.get(key);
  -        if ( sender == null ) throw new java.io.IOException("Sender not available. Make sure sender information is available to the ReplicationTransmitter.");
  -        if ( !sender.isConnected() ) sender.connect();
  -        sender.sendMessage(data);
  +        sendMessageData(sessionId,data,sender);
       }
   
  -    public void sendMessage(byte[] indata) throws java.io.IOException
  +    public void sendMessage(String sessionId, byte[] indata) throws java.io.IOException
       {
           java.util.Iterator i = map.entrySet().iterator();
           java.util.Vector v = new java.util.Vector();
  @@ -152,14 +155,11 @@
               IDataSender sender = (IDataSender)((java.util.Map.Entry)i.next()).getValue();
               try
               {
  -                if (!sender.isConnected())
  -                    sender.connect();
  -                sender.sendMessage(data);
  +                sendMessageData(sessionId,data,sender);
               }catch ( Exception x)
               {
  -                log.warn("Unable to send replicated message, is server down?",x);
  +                log.warn("Unable to send replicated message to "+sender+", is server down?",x);
               }
  -
           }//while
       }
   
  
  
  
  1.13      +25 -19    jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java
  
  Index: SimpleTcpCluster.java
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java,v
  retrieving revision 1.12
  retrieving revision 1.13
  diff -u -r1.12 -r1.13
  --- SimpleTcpCluster.java	16 Apr 2003 04:06:55 -0000	1.12
  +++ SimpleTcpCluster.java	18 Apr 2003 02:51:24 -0000	1.13
  @@ -1,13 +1,7 @@
   /*
  -<<<<<<< SimpleTcpCluster.java
    * $Header$
    * $Revision$
    * $Date$
  -=======
  - * $Header$
  - * $Revision$
  - * $Date$
  ->>>>>>> 1.7
    *
    * ====================================================================
    *
  @@ -275,7 +269,12 @@
        * The channel configuration.
        */
       protected String protocol = null;
  -
  +    
  +    /**
  +     * The replication mode, can be either synchronous or asynchronous
  +     * defaults to synchronous
  +     */
  +    protected String replicationMode="synchronous";
       // ------------------------------------------------------------- Properties
   
       public SimpleTcpCluster() {
  @@ -320,7 +319,15 @@
           return(this.debug);
       }
   
  -
  +    public void setReplicationMode(String mode) {
  +        if ("synchronous".equals(mode) ||
  +            "asynchronous".equals(mode)) {
  +            log.debug("Setting replcation mode to "+mode);
  +            this.replicationMode = mode;
  +        } else 
  +            throw new IllegalArgumentException("Replication mode must be either synchronous or asynchronous");
  +        
  +    }
       /**
        * Set the name of the cluster to join, if no cluster with
        * this name is present create one.
  @@ -492,7 +499,7 @@
                                           this.tcpSelectorTimeout);
               mReplicationListener.setDaemon(true);
               mReplicationListener.start();
  -            mReplicationTransmitter = new ReplicationTransmitter(new SocketSender[0]);
  +            mReplicationTransmitter = new ReplicationTransmitter(new IDataSender[0]);
               mReplicationTransmitter.start();
   
               //wait 5 seconds to establish the view membership
  @@ -526,13 +533,14 @@
               if(destination != null) {
                     Member tcpdest = dest;
                     if ( (tcpdest != null) && (!localMember.equals(tcpdest)))  {
  -                       mReplicationTransmitter.sendMessage(data,
  +                       mReplicationTransmitter.sendMessage(msg.getSessionID(),
  +                                                           data,
                                                              InetAddress.getByName(tcpdest.getHost()),
                                                              tcpdest.getPort());
                     }//end if
               }
               else {
  -                mReplicationTransmitter.sendMessage(data);
  +                mReplicationTransmitter.sendMessage(msg.getSessionID(),data);
               }
           } catch ( Exception x ) {
               log.error("Unable to send message through tcp channel",x);
  @@ -568,9 +576,7 @@
           try  {
               log.info("Replication member added:" + member);
               Member mbr = member;
  -            mReplicationTransmitter.add(
  -                new SocketSender(InetAddress.getByName(mbr.getHost()),
  -                                 mbr.getPort()));
  +            mReplicationTransmitter.add(IDataSenderFactory.getIDataSender(replicationMode,mbr));
           } catch ( Exception x ) {
               log.error("Unable to connect to replication system.",x);
           }
  @@ -589,7 +595,7 @@
           }
           catch ( Exception x )
           {
  -            log.error("Unable remove cluster node from replicaiton system.",x);
  +            log.error("Unable remove cluster node from replication system.",x);
           }
   
       }
  
  
  
  1.2       +10 -12    jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SocketSender.java
  
  Index: SocketSender.java
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SocketSender.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- SocketSender.java	19 Feb 2003 20:32:11 -0000	1.1
  +++ SocketSender.java	18 Apr 2003 02:51:24 -0000	1.2
  @@ -63,9 +63,6 @@
   
   package org.apache.catalina.cluster.tcp;
   import java.net.InetAddress ;
  -import java.nio.channels.SocketChannel;
  -import java.net.InetSocketAddress;
  -import java.nio.ByteBuffer;
   import java.net.Socket;
   
   /**
  @@ -83,7 +80,6 @@
       private InetAddress address;
       private int port;
       private Socket sc = null;
  -    protected ByteBuffer dbuf = ByteBuffer.allocateDirect(1024);
       private boolean isSocketConnected = false;
   
       public SocketSender(InetAddress host, int port)
  @@ -104,12 +100,8 @@
   
       public void connect() throws java.io.IOException
       {
  -        //InetSocketAddress isa = new InetSocketAddress(getAddress(), getPort());
           sc = new Socket(getAddress(),getPort());
           isSocketConnected = true;
  -        // Connect
  -        //sc = SocketChannel.open();
  -        //sc.connect(isa);
       }
   
       public void disconnect()
  @@ -132,7 +124,7 @@
        * @param data
        * @throws java.io.IOException
        */
  -    public synchronized void sendMessage(byte[] data) throws java.io.IOException
  +    public synchronized void sendMessage(String sessionId, byte[] data) throws java.io.IOException
       {
           if ( !isConnected() ) connect();
           try
  @@ -145,6 +137,12 @@
               connect();
               sc.getOutputStream().write(data);
           }
  +    }
  +    
  +    public String toString() {
  +        StringBuffer buf = new StringBuffer("SocketSender[");
  +        buf.append(getAddress()).append(":").append(getPort()).append("]");
  +        return buf.toString();
       }
   
   
  
  
  
  1.2       +7 -7      jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ThreadPool.java
  
  Index: ThreadPool.java
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ThreadPool.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- ThreadPool.java	19 Feb 2003 20:32:11 -0000	1.1
  +++ ThreadPool.java	18 Apr 2003 02:51:24 -0000	1.2
  @@ -84,14 +84,14 @@
   
       List idle = new LinkedList();
   
  -    ThreadPool (int poolSize)
  -    {
  +    ThreadPool (int poolSize, Class threadClass) throws Exception {
           // fill up the pool with worker threads
           for (int i = 0; i < poolSize; i++) {
  -            WorkerThread thread = new WorkerThread (this);
  +            WorkerThread thread = (WorkerThread)threadClass.newInstance();
  +            thread.setPool(this);
   
               // set thread name for debugging, start it
  -            thread.setName ("Tcp Replication Thread[" + (i + 1)+"]");
  +            thread.setName (threadClass.getName()+"[" + (i + 1)+"]");
               thread.setDaemon(true);
               thread.start();
   
  
  
  
  1.3       +10 -116   jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/WorkerThread.java
  
  Index: WorkerThread.java
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/WorkerThread.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- WorkerThread.java	20 Mar 2003 20:46:14 -0000	1.2
  +++ WorkerThread.java	18 Apr 2003 02:51:24 -0000	1.3
  @@ -62,38 +62,18 @@
    */
   
   package org.apache.catalina.cluster.tcp;
  -import java.nio.ByteBuffer;
  -import java.nio.channels.SelectionKey;
  -import java.nio.channels.Selector;
  -import java.util.List;
  -import java.io.IOException;
  -import java.nio.channels.SocketChannel;
  -import org.apache.catalina.cluster.io.ObjectReader;
  -import java.util.LinkedList;
  -/**
  -     * A worker thread class which can drain channels and echo-back
  -     * the input.  Each instance is constructed with a reference to
  -     * the owning thread pool object. When started, the thread loops
  -     * forever waiting to be awakened to service the channel associated
  -     * with a SelectionKey object.
  -     * The worker is tasked by calling its serviceChannel() method
  -     * with a SelectionKey object.  The serviceChannel() method stores
  -     * the key reference in the thread object then calls notify()
  -     * to wake it up.  When the channel has been drained, the worker
  -     * thread returns itself to its parent pool.
  -     */
  +
  +
   public class WorkerThread extends Thread
   {
       private static org.apache.commons.logging.Log log =
           org.apache.commons.logging.LogFactory.getLog( SimpleTcpCluster.class );
  -    private ByteBuffer buffer = ByteBuffer.allocate (1024);
  -    private ThreadPool pool;
  -    private SelectionKey key;
  -    private boolean doRun = true;
  +    protected ThreadPool pool;
  +    protected boolean doRun = true;
   
  -    WorkerThread (ThreadPool pool)
  -    {
  -        this.pool = pool;
  +   
  +    public void setPool(ThreadPool pool) {
  +        this.pool = pool;        
       }
   
       public synchronized void close()
  @@ -101,91 +81,5 @@
           doRun = false;
           notify();
   
  -    }
  -
  -    // loop forever waiting for work to do
  -    public synchronized void run()
  -    {
  -        while (doRun) {
  -            try {
  -                // sleep and release object lock
  -                this.wait();
  -            } catch (InterruptedException e) {
  -                log.info("TCP worker thread interrupted in cluster",e);
  -                // clear interrupt status
  -                this.interrupted();
  -            }
  -            if (key == null) {
  -                continue;	// just in case
  -            }
  -            try {
  -                drainChannel (key);
  -            } catch (Exception e) {
  -                log.info ("TCP Worker thread in cluster caught '"
  -                    + e + "' closing channel");
  -                // close channel and nudge selector
  -                try {
  -                    key.channel().close();
  -                } catch (IOException ex) {
  -                    log.error("Unable to close channel.",ex);
  -                }
  -                key.selector().wakeup();
  -            }
  -            key = null;
  -            // done, ready for more, return to pool
  -            this.pool.returnWorker (this);
  -        }
  -    }
  -
  -    /**
  -     * Called to initiate a unit of work by this worker thread
  -     * on the provided SelectionKey object.  This method is
  -     * synchronized, as is the run() method, so only one key
  -     * can be serviced at a given time.
  -     * Before waking the worker thread, and before returning
  -     * to the main selection loop, this key's interest set is
  -     * updated to remove OP_READ.  This will cause the selector
  -     * to ignore read-readiness for this channel while the
  -     * worker thread is servicing it.
  -     */
  -    synchronized void serviceChannel (SelectionKey key)
  -    {
  -        this.key = key;
  -        key.interestOps (key.interestOps() & (~SelectionKey.OP_READ));
  -        this.notify();		// awaken the thread
  -    }
  -
  -    /**
  -     * The actual code which drains the channel associated with
  -     * the given key.  This method assumes the key has been
  -     * modified prior to invocation to turn off selection
  -     * interest in OP_READ.  When this method completes it
  -     * re-enables OP_READ and calls wakeup() on the selector
  -     * so the selector will resume watching this channel.
  -     */
  -    void drainChannel (SelectionKey key)
  -        throws Exception
  -    {
  -        SocketChannel channel = (SocketChannel) key.channel();
  -        int count;
  -        buffer.clear();			// make buffer empty
  -        ObjectReader reader = (ObjectReader)key.attachment();
  -        // loop while data available, channel is non-blocking
  -        while ((count = channel.read (buffer)) > 0) {
  -            buffer.flip();		// make buffer readable
  -            reader.append(buffer.array(),0,count);
  -            buffer.clear();		// make buffer empty
  -        }
  -        //check to see if any data is available
  -        reader.execute();
  -        if (count < 0) {
  -            // close channel on EOF, invalidates the key
  -            channel.close();
  -            return;
  -        }
  -        // resume interest in OP_READ
  -        key.interestOps (key.interestOps() | SelectionKey.OP_READ);
  -        // cycle the selector so this key is active again
  -        key.selector().wakeup();
       }
   }
  
  
  
  1.1                  jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/AsyncSocketSender.java
  
  Index: AsyncSocketSender.java
  ===================================================================
  /*
   * $Header: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/AsyncSocketSender.java,v 1.1 2003/04/18 02:51:24 fhanik Exp $
   * $Revision: 1.1 $
   * $Date: 2003/04/18 02:51:24 $
   *
   * ====================================================================
   *
   * The Apache Software License, Version 1.1
   *
   * Copyright (c) 1999 The Apache Software Foundation.  All rights
   * reserved.
   *
   * Redistribution and use in source and binary forms, with or without
   * modification, are permitted provided that the following conditions
   * are met:
   *
   * 1. Redistributions of source code must retain the above copyright
   *    notice, this list of conditions and the following disclaimer.
   *
   * 2. Redistributions in binary form must reproduce the above copyright
   *    notice, this list of conditions and the following disclaimer in
   *    the documentation and/or other materials provided with the
   *    distribution.
   *
   * 3. The end-user documentation included with the redistribution, if
   *    any, must include the following acknowlegement:
   *       "This product includes software developed by the
   *        Apache Software Foundation (http://www.apache.org/)."
   *    Alternately, this acknowlegement may appear in the software itself,
   *    if and wherever such third-party acknowlegements normally appear.
   *
   * 4. The names "The Jakarta Project", "Tomcat", and "Apache Software
   *    Foundation" must not be used to endorse or promote products derived
   *    from this software without prior written permission. For written
   *    permission, please contact apache@apache.org.
   *
   * 5. Products derived from this software may not be called "Apache"
   *    nor may "Apache" appear in their names without prior written
   *    permission of the Apache Group.
   *
   * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
   * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
   * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
   * DISCLAIMED.  IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
   * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
   * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
   * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
   * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
   * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
   * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
   * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   * SUCH DAMAGE.
   * ====================================================================
   *
   * This software consists of voluntary contributions made by many
   * individuals on behalf of the Apache Software Foundation.  For more
   * information on the Apache Software Foundation, please see
   * <http://www.apache.org/>.
   *
   * [Additional notices, if required by prior licensing conditions]
   *
   */
  
  package org.apache.catalina.cluster.tcp;
  
  import java.net.InetAddress ;
  import java.net.Socket;
  import java.io.IOException;
  import org.apache.catalina.cluster.util.SmartQueue;
  public class AsyncSocketSender implements IDataSender {
      private InetAddress address;
      private int port;
      private Socket sc = null;
      private boolean isSocketConnected = false;
      private SmartQueue queue = new SmartQueue();
      
      public AsyncSocketSender(InetAddress host, int port)  {
          this.address = host;
          this.port = port;
          QueueThread t = new QueueThread(this);
          t.setDaemon(true);
          t.start();
          SimpleTcpCluster.log.info("Started async sender thread for TCP replication.");
      }
  
      public InetAddress getAddress() {
          return address;
      }
  
      public int getPort() {
          return port;
      }
  
      public void connect() throws java.io.IOException  {
          sc = new Socket(getAddress(),getPort());
          isSocketConnected = true;
      }
  
      public void disconnect()  {
          try
          {
              sc.close();
          }catch ( Exception x)
          {}
          isSocketConnected = false;
      }
  
      public boolean isConnected() {
          return isSocketConnected;
      }
  
      /**
       * Blocking send
       * @param data
       * @throws java.io.IOException
       */
      private synchronized void sendMessage(byte[] data) throws java.io.IOException  {
          if ( !isConnected() ) connect();
          try
          {
              sc.getOutputStream().write(data);
          }
          catch ( java.io.IOException x )
          {
              disconnect();
              connect();
              sc.getOutputStream().write(data);
          }
      }
  
      public synchronized void sendMessage(String sessionId, byte[] data) throws java.io.IOException {
          SmartQueue.SmartEntry entry = new SmartQueue.SmartEntry(sessionId,data);
          queue.add(entry);
      }
  
      public String toString() {
          StringBuffer buf = new StringBuffer("SocketSender[");
          buf.append(getAddress()).append(":").append(getPort()).append("]");
          return buf.toString();
      }
      
      private class QueueThread extends Thread {
          AsyncSocketSender sender;
          public QueueThread(AsyncSocketSender sender) {
              this.sender = sender;
          }
          
          public void run() {
              while (true) {
                  SmartQueue.SmartEntry entry = sender.queue.remove();
                  if ( entry != null ) {
                      try {
                          byte[] data = (byte[]) entry.getValue();
                          sender.sendMessage(data);
                      }
                      catch (Exception x) {
                          SimpleTcpCluster.log.warn(
                              "Unable to asynchronously send session w/ id=" +
                              entry.getKey()+" message will be ignored.");
                      }
                  }
              }
          }
      }
  }
  
  
  1.1                  jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/IDataSenderFactory.java
  
  Index: IDataSenderFactory.java
  ===================================================================
  /*
   * $Header: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/IDataSenderFactory.java,v 1.1 2003/04/18 02:51:24 fhanik Exp $
   * $Revision: 1.1 $
   * $Date: 2003/04/18 02:51:24 $
   *
   * ====================================================================
   *
   * The Apache Software License, Version 1.1
   *
   * Copyright (c) 1999 The Apache Software Foundation.  All rights
   * reserved.
   *
   * Redistribution and use in source and binary forms, with or without
   * modification, are permitted provided that the following conditions
   * are met:
   *
   * 1. Redistributions of source code must retain the above copyright
   *    notice, this list of conditions and the following disclaimer.
   *
   * 2. Redistributions in binary form must reproduce the above copyright
   *    notice, this list of conditions and the following disclaimer in
   *    the documentation and/or other materials provided with the
   *    distribution.
   *
   * 3. The end-user documentation included with the redistribution, if
   *    any, must include the following acknowlegement:
   *       "This product includes software developed by the
   *        Apache Software Foundation (http://www.apache.org/)."
   *    Alternately, this acknowlegement may appear in the software itself,
   *    if and wherever such third-party acknowlegements normally appear.
   *
   * 4. The names "The Jakarta Project", "Tomcat", and "Apache Software
   *    Foundation" must not be used to endorse or promote products derived
   *    from this software without prior written permission. For written
   *    permission, please contact apache@apache.org.
   *
   * 5. Products derived from this software may not be called "Apache"
   *    nor may "Apache" appear in their names without prior written
   *    permission of the Apache Group.
   *
   * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
   * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
   * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
   * DISCLAIMED.  IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
   * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
   * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
   * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
   * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
   * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
   * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
   * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   * SUCH DAMAGE.
   * ====================================================================
   *
   * This software consists of voluntary contributions made by many
   * individuals on behalf of the Apache Software Foundation.  For more
   * information on the Apache Software Foundation, please see
   * <http://www.apache.org/>.
   *
   * [Additional notices, if required by prior licensing conditions]
   *
   */
  
  package org.apache.catalina.cluster.tcp;
  import org.apache.catalina.cluster.Member;
  import java.net.InetAddress;
  public class IDataSenderFactory {
      private IDataSenderFactory() {
      }
      public static final String SYNC_MODE="synchronous";
      public static final String ASYNC_MODE="asynchronous";
      public synchronized static IDataSender getIDataSender(String mode, Member mbr) 
      throws java.io.IOException {
          if (SYNC_MODE.equals(mode) )
              return new SocketSender(InetAddress.getByName(mbr.getHost()),mbr.getPort());
          else if ( ASYNC_MODE.equals(mode) )
              return new AsyncSocketSender(InetAddress.getByName(mbr.getHost()),mbr.getPort());
          else
              throw new java.io.IOException("Invalid replication mode="+mode);
      }
      
  
  }
  
  
  1.1                  jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/TcpReplicationThread.java
  
  Index: TcpReplicationThread.java
  ===================================================================
  /*
   * $Header: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/TcpReplicationThread.java,v 1.1 2003/04/18 02:51:24 fhanik Exp $
   * $Revision: 1.1 $
   * $Date: 2003/04/18 02:51:24 $
   *
   * ====================================================================
   *
   * The Apache Software License, Version 1.1
   *
   * Copyright (c) 1999 The Apache Software Foundation.  All rights
   * reserved.
   *
   * Redistribution and use in source and binary forms, with or without
   * modification, are permitted provided that the following conditions
   * are met:
   *
   * 1. Redistributions of source code must retain the above copyright
   *    notice, this list of conditions and the following disclaimer.
   *
   * 2. Redistributions in binary form must reproduce the above copyright
   *    notice, this list of conditions and the following disclaimer in
   *    the documentation and/or other materials provided with the
   *    distribution.
   *
   * 3. The end-user documentation included with the redistribution, if
   *    any, must include the following acknowlegement:
   *       "This product includes software developed by the
   *        Apache Software Foundation (http://www.apache.org/)."
   *    Alternately, this acknowlegement may appear in the software itself,
   *    if and wherever such third-party acknowlegements normally appear.
   *
   * 4. The names "The Jakarta Project", "Tomcat", and "Apache Software
   *    Foundation" must not be used to endorse or promote products derived
   *    from this software without prior written permission. For written
   *    permission, please contact apache@apache.org.
   *
   * 5. Products derived from this software may not be called "Apache"
   *    nor may "Apache" appear in their names without prior written
   *    permission of the Apache Group.
   *
   * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
   * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
   * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
   * DISCLAIMED.  IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
   * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
   * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
   * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
   * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
   * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
   * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
   * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   * SUCH DAMAGE.
   * ====================================================================
   *
   * This software consists of voluntary contributions made by many
   * individuals on behalf of the Apache Software Foundation.  For more
   * information on the Apache Software Foundation, please see
   * <http://www.apache.org/>.
   *
   * [Additional notices, if required by prior licensing conditions]
   *
   */
  
  package org.apache.catalina.cluster.tcp;
  import java.nio.ByteBuffer;
  import java.nio.channels.SelectionKey;
  import java.nio.channels.Selector;
  import java.util.List;
  import java.io.IOException;
  import java.nio.channels.SocketChannel;
  import org.apache.catalina.cluster.io.ObjectReader;
  import java.util.LinkedList;
  /**
       * A worker thread class which can drain channels and echo-back
       * the input.  Each instance is constructed with a reference to
       * the owning thread pool object. When started, the thread loops
       * forever waiting to be awakened to service the channel associated
       * with a SelectionKey object.
       * The worker is tasked by calling its serviceChannel() method
       * with a SelectionKey object.  The serviceChannel() method stores
       * the key reference in the thread object then calls notify()
       * to wake it up.  When the channel has been drained, the worker
       * thread returns itself to its parent pool.
       */
  public class TcpReplicationThread extends WorkerThread
  {
      private static org.apache.commons.logging.Log log =
          org.apache.commons.logging.LogFactory.getLog( SimpleTcpCluster.class );
      private ByteBuffer buffer = ByteBuffer.allocate (1024);
      private SelectionKey key;
  
  
      TcpReplicationThread ()
      {
      }
  
      // loop forever waiting for work to do
      public synchronized void run()
      {
          while (doRun) {
              try {
                  // sleep and release object lock
                  this.wait();
              } catch (InterruptedException e) {
                  log.info("TCP worker thread interrupted in cluster",e);
                  // clear interrupt status
                  this.interrupted();
              }
              if (key == null) {
                  continue;	// just in case
              }
              try {
                  drainChannel (key);
              } catch (Exception e) {
                  log.info ("TCP Worker thread in cluster caught '"
                      + e + "' closing channel");
                  // close channel and nudge selector
                  try {
                      key.channel().close();
                  } catch (IOException ex) {
                      log.error("Unable to close channel.",ex);
                  }
                  key.selector().wakeup();
              }
              key = null;
              // done, ready for more, return to pool
              this.pool.returnWorker (this);
          }
      }
  
      /**
       * Called to initiate a unit of work by this worker thread
       * on the provided SelectionKey object.  This method is
       * synchronized, as is the run() method, so only one key
       * can be serviced at a given time.
       * Before waking the worker thread, and before returning
       * to the main selection loop, this key's interest set is
       * updated to remove OP_READ.  This will cause the selector
       * to ignore read-readiness for this channel while the
       * worker thread is servicing it.
       */
      synchronized void serviceChannel (SelectionKey key)
      {
          this.key = key;
          key.interestOps (key.interestOps() & (~SelectionKey.OP_READ));
          this.notify();		// awaken the thread
      }
  
      /**
       * The actual code which drains the channel associated with
       * the given key.  This method assumes the key has been
       * modified prior to invocation to turn off selection
       * interest in OP_READ.  When this method completes it
       * re-enables OP_READ and calls wakeup() on the selector
       * so the selector will resume watching this channel.
       */
      void drainChannel (SelectionKey key)
          throws Exception
      {
          SocketChannel channel = (SocketChannel) key.channel();
          int count;
          buffer.clear();			// make buffer empty
          ObjectReader reader = (ObjectReader)key.attachment();
          // loop while data available, channel is non-blocking
          while ((count = channel.read (buffer)) > 0) {
              buffer.flip();		// make buffer readable
              reader.append(buffer.array(),0,count);
              buffer.clear();		// make buffer empty
          }
          //check to see if any data is available
          reader.execute();
          if (count < 0) {
              // close channel on EOF, invalidates the key
              channel.close();
              return;
          }
          // resume interest in OP_READ
          key.interestOps (key.interestOps() | SelectionKey.OP_READ);
          // cycle the selector so this key is active again
          key.selector().wakeup();
      }
  }
  
  
  
  1.1                  jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/util/SmartQueue.java
  
  Index: SmartQueue.java
  ===================================================================
  /*
   * $Header: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/util/SmartQueue.java,v 1.1 2003/04/18 02:51:24 fhanik Exp $
   * $Revision: 1.1 $
   * $Date: 2003/04/18 02:51:24 $
   *
   * ====================================================================
   *
   * The Apache Software License, Version 1.1
   *
   * Copyright (c) 1999 The Apache Software Foundation.  All rights
   * reserved.
   *
   * Redistribution and use in source and binary forms, with or without
   * modification, are permitted provided that the following conditions
   * are met:
   *
   * 1. Redistributions of source code must retain the above copyright
   *    notice, this list of conditions and the following disclaimer.
   *
   * 2. Redistributions in binary form must reproduce the above copyright
   *    notice, this list of conditions and the following disclaimer in
   *    the documentation and/or other materials provided with the
   *    distribution.
   *
   * 3. The end-user documentation included with the redistribution, if
   *    any, must include the following acknowlegement:
   *       "This product includes software developed by the
   *        Apache Software Foundation (http://www.apache.org/)."
   *    Alternately, this acknowlegement may appear in the software itself,
   *    if and wherever such third-party acknowlegements normally appear.
   *
   * 4. The names "The Jakarta Project", "Tomcat", and "Apache Software
   *    Foundation" must not be used to endorse or promote products derived
   *    from this software without prior written permission. For written
   *    permission, please contact apache@apache.org.
   *
   * 5. Products derived from this software may not be called "Apache"
   *    nor may "Apache" appear in their names without prior written
   *    permission of the Apache Group.
   *
   * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
   * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
   * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
   * DISCLAIMED.  IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
   * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
   * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
   * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
   * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
   * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
   * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
   * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   * SUCH DAMAGE.
   * ====================================================================
   *
   * This software consists of voluntary contributions made by many
   * individuals on behalf of the Apache Software Foundation.  For more
   * information on the Apache Software Foundation, please see
   * <http://www.apache.org/>.
   *
   * [Additional notices, if required by prior licensing conditions]
   *
   */
  
  package org.apache.catalina.cluster.util;
  
  /**
   * A smart queue, used for async replication<BR>
   * the "smart" part of this queue is that if the session is already queued for replication,
   * and it is updated again, the session will simply be replaced, hence we don't 
   * replicate stuff that is obsolete.
   * Put this into util, since it is quite  generic.
   * 
   * @author Filip Hanik
   * @version 1.0
   */
   
  
  import java.util.LinkedList;
  import java.util.HashMap;
  
  public class SmartQueue {
      /**
       * This is the actual queue
       */
      private LinkedList queue = new LinkedList();
      /**
       * And this is only for performance, fast lookups
       */
      private HashMap queueMap = new HashMap();
      
      private Object mutex = new Object();
      public static int debug = 0;
      
      public SmartQueue() {
      }
      
      /**
       * Add an object to the queue
       * @param entry - the smart entry
       */
      public void add(SmartEntry entry) {
          /*make sure we are within a synchronized block since we are dealing with two
            unsync collections*/
          synchronized (mutex) {
              /*check to see if this object has already been queued*/
              SmartEntry current = (SmartEntry)queueMap.get(entry.getKey());
              if ( current == null ) {
                  /*the object has not been queued, at it to the end of the queue*/
                  if ( debug != 0 ) System.out.println("["+Thread.currentThread().getName()+"][SmartQueue] Adding new object="+entry);
                  queue.addLast(entry);
                  queueMap.put(entry.getKey(),entry);
              }else {
                  /*the object has been queued, replace the value*/
                  if ( debug != 0 ) System.out.print("["+Thread.currentThread().getName()+"][SmartQueue] Replacing old object="+current);
                  current.setValue(entry.getValue());
                  if ( debug != 0 ) System.out.println("with new object="+current);
              }
              /*wake up all the threads that are waiting for the lock to be released*/
              mutex.notifyAll();
          }
      }
      
      public int size() {
          synchronized (mutex) {
              return queue.size();            
          }
      }
      
      /**
       * Blocks forever until an element has been added to the queue
       * @return
       */
      public SmartEntry remove() {
          SmartEntry result = null;        
          synchronized (mutex) {
              while ( size() == 0 ) {
                  try {
                      if ( debug != 0 ) System.out.println("["+Thread.currentThread().getName()+"][SmartQueue] Queue sleeping until object added size="+size()+".");
                      mutex.wait();
                      if ( debug != 0 ) System.out.println("["+Thread.currentThread().getName()+"][SmartQueue] Queue woke up or interrupted size="+size()+".");
                  }
                  catch(IllegalMonitorStateException ex) {
                      throw ex;
                  }
                  catch(InterruptedException ex) {
                  }//catch
              }//while
              /*guaranteed that we are not empty by now*/
              result = (SmartEntry)queue.removeFirst();
              queueMap.remove(result.getKey());
              if ( debug != 0 ) System.out.println("["+Thread.currentThread().getName()+"][SmartQueue] Returning="+result);
          }
          return result;
      }
      
      
      
      public static class SmartEntry {
          protected Object key;
          protected Object value;
          public SmartEntry(Object key,
                                 Object value) {
              if ( key == null ) throw new IllegalArgumentException("SmartEntry key can not be null.");
              if ( value == null ) throw new IllegalArgumentException("SmartEntry value can not be null.");
              this.key = key;
              this.value = value;
          }
          
          public Object getKey() {
              return key;
          }
          
          public Object getValue() {
              return value;
          }
          
          public void setValue(Object value) {
              if ( value == null ) throw new IllegalArgumentException("SmartEntry value can not be null.");
              this.value = value;
          }
          
          public int hashCode() {
              return key.hashCode();
          }
          
          public boolean equals(Object o) {
              if (!(o instanceof SmartEntry)) return false;
              SmartEntry other = (SmartEntry)o;
              return other.getKey().equals(getKey());
          }
          
          public String toString() {
              return "[SmartyEntry key="+key+" value="+value+"]";
          }
      }
      
  
  }
  
  

---------------------------------------------------------------------
To unsubscribe, e-mail: tomcat-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: tomcat-dev-help@jakarta.apache.org