You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2003/12/18 05:20:15 UTC

cvs commit: jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp Jdk13ReplicationListener.java ReplicationListener.java ReplicationTransmitter.java SimpleTcpCluster.java SocketSender.java TcpReplicationThread.java

fhanik      2003/12/17 20:20:15

  Modified:    modules/cluster/src/share/org/apache/catalina/cluster/io
                        ObjectReader.java
               modules/cluster/src/share/org/apache/catalina/cluster/mcast
                        McastService.java McastServiceImpl.java
               modules/cluster/src/share/org/apache/catalina/cluster/tcp
                        ReplicationListener.java
                        ReplicationTransmitter.java SimpleTcpCluster.java
                        SocketSender.java TcpReplicationThread.java
  Added:       modules/cluster/src/share/org/apache/catalina/cluster/io
                        Jdk13ObjectReader.java
               modules/cluster/src/share/org/apache/catalina/cluster/tcp
                        Jdk13ReplicationListener.java
  Log:
  adding in a regular io cluster listener to be used with JDK1.3
  
  Revision  Changes    Path
  1.2       +14 -17    jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/ObjectReader.java
  
  Index: ObjectReader.java
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/ObjectReader.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- ObjectReader.java	19 Feb 2003 20:32:10 -0000	1.1
  +++ ObjectReader.java	18 Dec 2003 04:20:14 -0000	1.2
  @@ -102,25 +102,22 @@
           return this.channel;
       }
   
  -    public boolean append(byte[] data,int off,int len) {
  +    public int append(byte[] data,int off,int len) {
           boolean result = false;
           buffer.append(data,off,len);
  -        if ( buffer.doesPackageExist() ) {
  +        int pkgCnt = 0;
  +        boolean pkgExists = buffer.doesPackageExist();
  +        while ( pkgExists ) {
               byte[] b = buffer.extractPackage(true);
               callback.messageDataReceived(b);
  -            result = true;
  +            pkgCnt++;
  +            pkgExists = buffer.doesPackageExist();
           }//end if
  -        return result;
  +        return pkgCnt;
       }
   
  -    public boolean execute() {
  -        boolean result = false;
  -        if ( buffer.doesPackageExist() ) {
  -            byte[] data = buffer.extractPackage(true);
  -            callback.messageDataReceived(data);
  -            result = true;
  -        }//end if
  -        return result;
  +    public int execute() {
  +        return append(new byte[0],0,0);
       }
   
       public int write(ByteBuffer buf)
  @@ -131,4 +128,4 @@
   
   
   
  -}
  \ No newline at end of file
  +}
  
  
  
  1.1                  jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/Jdk13ObjectReader.java
  
  Index: Jdk13ObjectReader.java
  ===================================================================
  /*
   * $Header: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/Jdk13ObjectReader.java,v 1.1 2003/12/18 04:20:14 fhanik Exp $
   * $Revision: 1.1 $
   * $Date: 2003/12/18 04:20:14 $
   *
   * ====================================================================
   *
   * The Apache Software License, Version 1.1
   *
   * Copyright (c) 1999 The Apache Software Foundation.  All rights
   * reserved.
   *
   * Redistribution and use in source and binary forms, with or without
   * modification, are permitted provided that the following conditions
   * are met:
   *
   * 1. Redistributions of source code must retain the above copyright
   *    notice, this list of conditions and the following disclaimer.
   *
   * 2. Redistributions in binary form must reproduce the above copyright
   *    notice, this list of conditions and the following disclaimer in
   *    the documentation and/or other materials provided with the
   *    distribution.
   *
   * 3. The end-user documentation included with the redistribution, if
   *    any, must include the following acknowlegement:
   *       "This product includes software developed by the
   *        Apache Software Foundation (http://www.apache.org/)."
   *    Alternately, this acknowlegement may appear in the software itself,
   *    if and wherever such third-party acknowlegements normally appear.
   *
   * 4. The names "The Jakarta Project", "Tomcat", and "Apache Software
   *    Foundation" must not be used to endorse or promote products derived
   *    from this software without prior written permission. For written
   *    permission, please contact apache@apache.org.
   *
   * 5. Products derived from this software may not be called "Apache"
   *    nor may "Apache" appear in their names without prior written
   *    permission of the Apache Group.
   *
   * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
   * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
   * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
   * DISCLAIMED.  IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
   * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
   * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
   * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
   * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
   * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
   * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
   * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   * SUCH DAMAGE.
   * ====================================================================
   *
   * This software consists of voluntary contributions made by many
   * individuals on behalf of the Apache Software Foundation.  For more
   * information on the Apache Software Foundation, please see
   * <http://www.apache.org/>.
   *
   * [Additional notices, if required by prior licensing conditions]
   *
   */
  
  package org.apache.catalina.cluster.io;
  
  /**
   * The object reader object is an object used in conjunction with
   * java.nio TCP messages. This object stores the message bytes in a
   * <code>XByteBuffer</code> until a full package has been received.
   * When a full package has been received, the append method will call messageDataReceived
   * on the callback object associated with this object reader.<BR>
   * This object uses an XByteBuffer which is an extendable object buffer that also allows
   * for message encoding and decoding.
   *
   * @author Filip Hanik
   * @version $Revision: 1.1 $, $Date: 2003/12/18 04:20:14 $
   */
  
  import java.net.Socket;
  import java.nio.ByteBuffer;
  import java.io.IOException;
  import org.apache.catalina.cluster.io.XByteBuffer;
  public class Jdk13ObjectReader
  {
      private Socket socket;
      private ListenCallback callback;
      private XByteBuffer buffer;
  
      public Jdk13ObjectReader( Socket socket,
                                 ListenCallback callback )  {
          this.socket = socket;
          this.callback = callback;
          this.buffer = new XByteBuffer();
      }
  
      public int append(byte[] data,int off,int len) {
          boolean result = false;
          buffer.append(data,off,len);
          int pkgCnt = 0;
          boolean pkgExists = buffer.doesPackageExist();
          while ( pkgExists ) {
              byte[] b = buffer.extractPackage(true);
              callback.messageDataReceived(b);
              pkgCnt++;
              pkgExists = buffer.doesPackageExist();
          }//end if
          return pkgCnt;
      }
  
      public int execute() {
          return append(new byte[0],0,0);
      }
  
      public int write(byte[] data)
         throws java.io.IOException {
         socket.getOutputStream().write(data);
         return 0;
  
      }
  
  
  
  
  }
  
  
  
  1.4       +6 -6      jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/mcast/McastService.java
  
  Index: McastService.java
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/mcast/McastService.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- McastService.java	16 Nov 2003 22:22:45 -0000	1.3
  +++ McastService.java	18 Dec 2003 04:20:14 -0000	1.4
  @@ -165,7 +165,7 @@
           String host = getProperties().getProperty("tcpListenHost");
           int port = Integer.parseInt(getProperties().getProperty("tcpListenPort"));
           String name = "tcp://"+host+":"+port;
  -        localMember = new McastMember(name,host,port,0);
  +        localMember = new McastMember(name,host,port,100);
           impl = new McastServiceImpl((McastMember)localMember,Long.parseLong(properties.getProperty("msgFrequency")),
                                       Long.parseLong(properties.getProperty("memberDropTime")),
                                       Integer.parseInt(properties.getProperty("mcastPort")),
  @@ -244,4 +244,4 @@
           service.start();
           Thread.currentThread().sleep(60*1000*60);
       }
  -}
  \ No newline at end of file
  +}
  
  
  
  1.5       +7 -6      jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/mcast/McastServiceImpl.java
  
  Index: McastServiceImpl.java
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/mcast/McastServiceImpl.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- McastServiceImpl.java	16 Nov 2003 22:22:45 -0000	1.4
  +++ McastServiceImpl.java	18 Dec 2003 04:20:14 -0000	1.5
  @@ -140,7 +140,7 @@
       /**
        * When was the service started
        */
  -    protected long serviceStartTime = 0;
  +    protected long serviceStartTime = System.currentTimeMillis();
   
       /**
        * Create a new mcast service impl
  @@ -186,6 +186,7 @@
        */
       public synchronized void start() throws IOException {
           if ( doRun ) throw new IllegalStateException("Service already running.");
  +        serviceStartTime = System.currentTimeMillis();
           socket.joinGroup(address);
           doRun = true;
           sender = new SenderThread(sendFrequency);
  @@ -194,7 +195,7 @@
           receiver.setDaemon(true);
           receiver.start();
           sender.start();
  -        serviceStartTime = System.currentTimeMillis();
  +
       }
   
       /**
  
  
  
  1.6       +40 -31    jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java
  
  Index: ReplicationListener.java
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -r1.5 -r1.6
  --- ReplicationListener.java	15 Dec 2003 21:33:06 -0000	1.5
  +++ ReplicationListener.java	18 Dec 2003 04:20:15 -0000	1.6
  @@ -142,33 +142,46 @@
           while (doListen) {
               // this may block for a long time, upon return the
               // selected set contains keys of the ready channels
  -            int n = selector.select(timeout);
  -            if (n == 0) {
  -                continue;	// nothing to do
  -            }
  -            // get an iterator over the set of selected keys
  -            Iterator it = selector.selectedKeys().iterator();
  -            // look at each key in the selected set
  -            while (it.hasNext()) {
  -                SelectionKey key = (SelectionKey) it.next();
  -                // Is a new connection coming in?
  -                if (key.isAcceptable()) {
  -                    ServerSocketChannel server =
  -                        (ServerSocketChannel) key.channel();
  -                    SocketChannel channel = server.accept();
  -                    registerChannel (selector,
  -                                     channel,
  -                                     SelectionKey.OP_READ | SelectionKey.OP_WRITE,
  -                                     new ObjectReader(channel,selector,callback));
  +            try {
  +
  +                int n = selector.select(timeout);
  +                if (n == 0) {
  +                    continue; // nothing to do
                   }
  -                // is there data to read on this channel?
  -                if (key.isReadable()) {
  -                    readDataFromSocket (key);
  +                // get an iterator over the set of selected keys
  +                Iterator it = selector.selectedKeys().iterator();
  +                // look at each key in the selected set
  +                while (it.hasNext()) {
  +                    SelectionKey key = (SelectionKey) it.next();
  +                    // Is a new connection coming in?
  +                    if (key.isAcceptable()) {
  +                        ServerSocketChannel server =
  +                            (ServerSocketChannel) key.channel();
  +                        SocketChannel channel = server.accept();
  +                        registerChannel(selector,
  +                                        channel,
  +                                        SelectionKey.OP_READ |
  +                                        SelectionKey.OP_WRITE,
  +                                        new ObjectReader(channel, selector,
  +                            callback));
  +                    }
  +                    // is there data to read on this channel?
  +                    if (key.isReadable()) {
  +                        readDataFromSocket(key);
  +                    }
  +                    // remove key from selected set, it's been handled
  +                    it.remove();
                   }
  -                // remove key from selected set, it's been handled
  -                it.remove();
               }
  -        }//while
  +            catch (java.nio.channels.CancelledKeyException nx) {
  +                log.warn(
  +                    "Replication client disconnected, error when polling key. Ignoring client.");
  +            }
  +            catch (Exception x) {
  +                log.error("Unable to process request in ReplicationListener", x);
  +            }
  +
  +        } //while
           serverChannel.close();
           selector.close();
       }
  @@ -197,10 +210,6 @@
       }
   
       // ----------------------------------------------------------
  -
  -    // Use the same byte buffer for all channels.  A single thread is
  -    // servicing all the channels, so no danger of concurrent acccess.
  -    private ByteBuffer buffer = ByteBuffer.allocateDirect (1024);
   
       /**
        * Sample data handler method for a channel with data ready to read.
  
  
  
  1.8       +11 -6     jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java
  
  Index: ReplicationTransmitter.java
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java,v
  retrieving revision 1.7
  retrieving revision 1.8
  diff -u -r1.7 -r1.8
  --- ReplicationTransmitter.java	15 Oct 2003 03:29:32 -0000	1.7
  +++ ReplicationTransmitter.java	18 Dec 2003 04:20:15 -0000	1.8
  @@ -124,7 +124,7 @@
           v.copyInto(result);
           return result;
       }
  -    
  +
       protected void sendMessageData(String sessionId, byte[] data, IDataSender sender) throws java.io.IOException  {
           if ( sender == null ) throw new java.io.IOException("Sender not available. Make sure sender information is available to the ReplicationTransmitter.");
           try
  @@ -132,9 +132,14 @@
               if (!sender.isConnected())
                   sender.connect();
               sender.sendMessage(sessionId,data);
  +            sender.setSuspect(false);
           }catch ( Exception x)
           {
  -            log.warn("Unable to send replicated message, is server down?",x);
  +            if ( !sender.getSuspect() ) {
  +                log.warn("Unable to send replicated message, is server down?",
  +                         x);
  +            }
  +            sender.setSuspect(true);
           }
   
       }
  @@ -169,4 +174,4 @@
   
   
   
  -}
  \ No newline at end of file
  +}
  
  
  
  1.21      +42 -20    jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java
  
  Index: SimpleTcpCluster.java
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java,v
  retrieving revision 1.20
  retrieving revision 1.21
  diff -u -r1.20 -r1.21
  --- SimpleTcpCluster.java	15 Dec 2003 21:33:06 -0000	1.20
  +++ SimpleTcpCluster.java	18 Dec 2003 04:20:15 -0000	1.21
  @@ -162,10 +162,6 @@
       protected int tcpThreadCount = 2;
   
       /**
  -     * ReplicationListener to listen for incoming data
  -     */
  -    protected ReplicationListener mReplicationListener;
  -    /**
        * ReplicationTransmitter to send data with
        */
       protected ReplicationTransmitter mReplicationTransmitter;
  @@ -272,6 +268,12 @@
        * defaults to synchronous
        */
       protected String replicationMode="synchronous";
  +
  +    private long nrOfMsgsReceived = 0;
  +    private long msgSendTime = 0;
  +    private long lastChecked = System.currentTimeMillis();
  +    private boolean isJdk13 = false;
  +
       // ------------------------------------------------------------- Properties
   
       public SimpleTcpCluster() {
  @@ -475,15 +477,31 @@
                   (sm.getString("cluster.alreadyStarted"));
           log.info("Cluster is about to start");
           try {
  -            mReplicationListener =
  -                new ReplicationListener(this,
  -                                        this.tcpThreadCount,
  -                                        this.tcpAddress,
  -                                        this.tcpPort,
  -                                        this.tcpSelectorTimeout,
  -                                        "synchronous".equals(this.replicationMode));
  -            mReplicationListener.setDaemon(true);
  -            mReplicationListener.start();
  +            if ( isJdk13 ) {
  +                Jdk13ReplicationListener mReplicationListener =
  +                    new Jdk13ReplicationListener(this,
  +                                            this.tcpThreadCount,
  +                                            this.tcpAddress,
  +                                            this.tcpPort,
  +                                            this.tcpSelectorTimeout,
  +                                            "synchronous".equals(this.
  +                    replicationMode));
  +                Thread t = new Thread(mReplicationListener);
  +                t.setDaemon(true);
  +                t.start();
  +            } else {
  +                ReplicationListener mReplicationListener =
  +                    new ReplicationListener(this,
  +                                            this.tcpThreadCount,
  +                                            this.tcpAddress,
  +                                            this.tcpPort,
  +                                            this.tcpSelectorTimeout,
  +                                            "synchronous".equals(this.
  +                    replicationMode));
  +                mReplicationListener.setDaemon(true);
  +                mReplicationListener.start();
  +            }
  +
               mReplicationTransmitter = new ReplicationTransmitter(new IDataSender[0]);
               mReplicationTransmitter.start();
   
  @@ -777,15 +795,19 @@
       // ---------------------------------------------  Inner Class
   
       // ---------------------------------------------  Performance
  -    private long nrOfMsgsReceived = 0;
  -    private long msgSendTime = 0;
  -    private long lastChecked = System.currentTimeMillis();
  +
       private void perfMessageRecvd(long timeSent) {
           nrOfMsgsReceived++;
           msgSendTime+=(System.currentTimeMillis()-timeSent);
           if ( (System.currentTimeMillis() - lastChecked) > 5000 ) {
               log.debug("Calc msg send time total="+msgSendTime+"ms num request="+nrOfMsgsReceived+" average per msg="+(msgSendTime/nrOfMsgsReceived)+"ms.");
           }
  +    }
  +    public boolean getIsJdk13() {
  +        return isJdk13;
  +    }
  +    public void setIsJdk13(boolean isJdk13) {
  +        this.isJdk13 = isJdk13;
       }
   
   }
  
  
  
  1.6       +31 -4     jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SocketSender.java
  
  Index: SocketSender.java
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SocketSender.java,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -r1.5 -r1.6
  --- SocketSender.java	15 Dec 2003 22:09:10 -0000	1.5
  +++ SocketSender.java	18 Dec 2003 04:20:15 -0000	1.6
  @@ -82,7 +82,12 @@
       private Socket sc = null;
       private boolean isSocketConnected = false;
       private boolean suspect;
  -    private long ackTimeout = 60*1000;
  +    private long ackTimeout = 5*1000;
  +    private long keepAliveTimeout = 60*1000; //keep socket open for no more than one min
  +    private int keepAliveMaxRequestCount = 100; //max 100 requests before reconnecting
  +    private long keepAliveConnectTime = 0;
  +    private int keepAliveCount = 0;
  +
   
       public SocketSender(InetAddress host, int port)
       {
  @@ -105,6 +110,8 @@
           sc = new Socket(getAddress(),getPort());
           sc.setSoTimeout((int)ackTimeout);
           isSocketConnected = true;
  +        this.keepAliveCount = 0;
  +        this.keepAliveConnectTime = System.currentTimeMillis();
       }
   
       public void disconnect()
  @@ -129,9 +136,16 @@
        */
       public synchronized void sendMessage(String sessionId, byte[] data) throws java.io.IOException
       {
  +        long ctime = System.currentTimeMillis() - this.keepAliveConnectTime;
  +        if ( (ctime > this.keepAliveTimeout) ||
  +             (this.keepAliveCount >= this.keepAliveMaxRequestCount) ) {
  +            disconnect();
  +        }
           if ( !isConnected() ) connect();
           try
           {
  +
  +
               sc.getOutputStream().write(data);
               sc.getOutputStream().flush();
               waitForAck(ackTimeout);
  @@ -144,6 +158,7 @@
               sc.getOutputStream().flush();
               waitForAck(ackTimeout);
           }
  +        this.keepAliveCount++;
       }
   
       private void waitForAck(long timeout)  throws java.io.IOException,
  @@ -169,6 +184,18 @@
   
       public void setSuspect(boolean suspect) {
           this.suspect = suspect;
  +    }
  +    public long getKeepAliveTimeout() {
  +        return keepAliveTimeout;
  +    }
  +    public void setKeepAliveTimeout(long keepAliveTimeout) {
  +        this.keepAliveTimeout = keepAliveTimeout;
  +    }
  +    public int getKeepAliveMaxRequestCount() {
  +        return keepAliveMaxRequestCount;
  +    }
  +    public void setKeepAliveMaxRequestCount(int keepAliveMaxRequestCount) {
  +        this.keepAliveMaxRequestCount = keepAliveMaxRequestCount;
       }
   
   
  
  
  
  1.3       +13 -9     jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/TcpReplicationThread.java
  
  Index: TcpReplicationThread.java
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/TcpReplicationThread.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- TcpReplicationThread.java	15 Dec 2003 21:33:06 -0000	1.2
  +++ TcpReplicationThread.java	18 Dec 2003 04:20:15 -0000	1.3
  @@ -167,26 +167,30 @@
           // loop while data available, channel is non-blocking
           while ((count = channel.read (buffer)) > 0) {
               buffer.flip();		// make buffer readable
  -            if (reader.append(buffer.array(),0,count)) {
  +            int pkgcnt = reader.append(buffer.array(),0,count);
  +            while ( pkgcnt > 0 ) {
                   if (synchronous) {
                       sendAck(key,channel);
                   } //end if
  +                pkgcnt--;
               }
               buffer.clear();		// make buffer empty
           }
           //check to see if any data is available
  -        if ( reader.execute() ) {
  +        int pkgcnt = reader.execute();
  +        while ( pkgcnt > 0 ) {
               if (synchronous) {
                   sendAck(key,channel);
  -            }//end if
  -        }//end if
  +            } //end if
  +            pkgcnt--;
  +        }
           if (count < 0) {
               // close channel on EOF, invalidates the key
               channel.close();
               return;
           }
  -        // resume interest in OP_READ
  -        key.interestOps (key.interestOps() | SelectionKey.OP_READ);
  +        // resume interest in OP_READ, OP_WRITE
  +        key.interestOps (key.interestOps() | SelectionKey.OP_READ | SelectionKey.OP_WRITE);
           // cycle the selector so this key is active again
           key.selector().wakeup();
       }
  
  
  
  1.1                  jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/Jdk13ReplicationListener.java
  
  Index: Jdk13ReplicationListener.java
  ===================================================================
  /*
   * $Header: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/Jdk13ReplicationListener.java,v 1.1 2003/12/18 04:20:15 fhanik Exp $
   * $Revision: 1.1 $
   * $Date: 2003/12/18 04:20:15 $
   *
   * ====================================================================
   *
   * The Apache Software License, Version 1.1
   *
   * Copyright (c) 1999 The Apache Software Foundation.  All rights
   * reserved.
   *
   * Redistribution and use in source and binary forms, with or without
   * modification, are permitted provided that the following conditions
   * are met:
   *
   * 1. Redistributions of source code must retain the above copyright
   *    notice, this list of conditions and the following disclaimer.
   *
   * 2. Redistributions in binary form must reproduce the above copyright
   *    notice, this list of conditions and the following disclaimer in
   *    the documentation and/or other materials provided with the
   *    distribution.
   *
   * 3. The end-user documentation included with the redistribution, if
   *    any, must include the following acknowlegement:
   *       "This product includes software developed by the
   *        Apache Software Foundation (http://www.apache.org/)."
   *    Alternately, this acknowlegement may appear in the software itself,
   *    if and wherever such third-party acknowlegements normally appear.
   *
   * 4. The names "The Jakarta Project", "Tomcat", and "Apache Software
   *    Foundation" must not be used to endorse or promote products derived
   *    from this software without prior written permission. For written
   *    permission, please contact apache@apache.org.
   *
   * 5. Products derived from this software may not be called "Apache"
   *    nor may "Apache" appear in their names without prior written
   *    permission of the Apache Group.
   *
   * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
   * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
   * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
   * DISCLAIMED.  IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
   * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
   * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
   * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
   * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
   * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
   * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
   * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   * SUCH DAMAGE.
   * ====================================================================
   *
   * This software consists of voluntary contributions made by many
   * individuals on behalf of the Apache Software Foundation.  For more
   * information on the Apache Software Foundation, please see
   * <http://www.apache.org/>.
   *
   * [Additional notices, if required by prior licensing conditions]
   *
   */
  
  package org.apache.catalina.cluster.tcp;
  
  
  
  
  import java.net.Socket;
  import java.net.ServerSocket;
  import java.net.InetSocketAddress;
  import java.nio.ByteBuffer;
  import java.util.Iterator;
  import org.apache.catalina.cluster.io.ListenCallback;
  import org.apache.catalina.cluster.io.Jdk13ObjectReader;
  import org.apache.catalina.cluster.io.XByteBuffer;
  /**
   */
  public class Jdk13ReplicationListener implements Runnable
  {
  
      private static org.apache.commons.logging.Log log =
          org.apache.commons.logging.LogFactory.getLog( SimpleTcpCluster.class );
      private ThreadPool pool = null;
      private boolean doListen = false;
      private ListenCallback callback;
      private java.net.InetAddress bind;
      private int port;
      private long timeout = 0;
      private boolean synchronous = false;
      ServerSocket serverSocket = null;
  
      public Jdk13ReplicationListener(ListenCallback callback,
                                 int poolSize,
                                 java.net.InetAddress bind,
                                 int port,
                                 long timeout,
                                 boolean synchronous)
      {
          this.synchronous=synchronous;
          this.callback = callback;
          this.bind = bind;
          this.port = port;
          this.timeout = timeout;
      }
  
      public void run()
      {
          try
          {
              listen();
          }
          catch ( Exception x )
          {
              log.fatal("Unable to start cluster listener.",x);
          }
      }
  
      public void listen ()
          throws Exception
      {
          doListen = true;
          // Get the associated ServerSocket to bind it with
          serverSocket = new ServerSocket();
          serverSocket.bind (new InetSocketAddress (bind,port));
          while (doListen) {
              Socket socket = serverSocket.accept();
              ClusterListenThread t = new ClusterListenThread(socket,new Jdk13ObjectReader(socket,callback));
              t.setDaemon(true);
              t.start();
          }//while
          serverSocket.close();
      }
  
      public void stopListening(){
          doListen = false;
          try {
              serverSocket.close();
          } catch ( Exception x ) {
              log.error("Unable to stop the replication listen socket",x);
          }
      }
  
      protected static class ClusterListenThread extends Thread {
          private Socket socket;
          private Jdk13ObjectReader reader;
          private boolean keepRunning = true;
          private static byte[] ackMsg = new byte[] {6,2,3};
          ClusterListenThread(Socket socket, Jdk13ObjectReader reader) {
              this.socket = socket;
              this.reader = reader;
          }
  
          public void run() {
              try {
                  byte[] buffer = new byte[1024];
                  while (keepRunning) {
                      java.io.InputStream in = socket.getInputStream();
                      int cnt = in.read(buffer);
                      int ack = 0;
                      if ( cnt > 0 ) {
                          ack = reader.append(buffer, 0, cnt);
                      }
                      while ( ack > 0 ) {
                          sendAck();
                          ack--;
                      }
                  }
              } catch ( Exception x ) {
                  keepRunning = false;
                  log.error("Unable to read data from client, disconnecting.",x);
                  try { socket.close(); } catch ( Exception ignore ) {}
              }
          }
  
          private void sendAck() throws java.io.IOException {
              //send a reply-acknowledgement
              socket.getOutputStream().write(ackMsg);
          }
  
      }
  }
  
  
  

---------------------------------------------------------------------
To unsubscribe, e-mail: tomcat-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: tomcat-dev-help@jakarta.apache.org