You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by bi...@apache.org on 2005/05/14 05:27:19 UTC

cvs commit: jakarta-tomcat-connectors/jk/java/org/apache/jk/common ChannelNioSocket.java

billbarker    2005/05/13 20:27:19

  Modified:    jk/java/org/apache/jk/common ChannelNioSocket.java
  Log:
  Checking in some stuff I've had here, before I start to fix Jk-Coyote so that it has a hope of working with Mark's patch.
  
  Now this uses direct ByteBuffers (makes a big difference :).
  
  Added a nioIsBroken flag, which if true seems to work around the NIO bugs in the Windows implementation.
  
  Solaris performance is actually pretty close to ChannelSocket now.  Windows is really slow.  Of course, it would need a lot more testing on more platforms before I'm willing to lift the 'experimental' label (and I still haven't found a case where it's better than ChannelSocket :).
  
  Revision  Changes    Path
  1.3       +164 -91   jakarta-tomcat-connectors/jk/java/org/apache/jk/common/ChannelNioSocket.java
  
  Index: ChannelNioSocket.java
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-connectors/jk/java/org/apache/jk/common/ChannelNioSocket.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- ChannelNioSocket.java	24 Apr 2005 04:27:42 -0000	1.2
  +++ ChannelNioSocket.java	14 May 2005 03:27:19 -0000	1.3
  @@ -99,6 +99,7 @@
       boolean tcpNoDelay=true; // nodelay to true by default
       int linger=100;
       int socketTimeout = 0;
  +    boolean nioIsBroken = false;
       private Selector selector = null;
   
       long requestCount=0;
  @@ -241,15 +242,15 @@
           tp.setMaxThreads(i);
       }
       
  -        public void setMinSpareThreads( int i ) {
  +    public void setMinSpareThreads( int i ) {
           if( log.isDebugEnabled()) log.debug("Setting minSpareThreads " + i);
  -            tp.setMinSpareThreads(i);
  -        }
  +        tp.setMinSpareThreads(i);
  +    }
   
  -        public void setMaxSpareThreads( int i ) {
  +    public void setMaxSpareThreads( int i ) {
           if( log.isDebugEnabled()) log.debug("Setting maxSpareThreads " + i);
  -            tp.setMaxSpareThreads(i);
  -        }
  +        tp.setMaxSpareThreads(i);
  +    }
   
       public int getMaxThreads() {
           return tp.getMaxThreads();   
  @@ -266,6 +267,13 @@
       public void setBacklog(int i) {
       }
       
  +    public void setNioIsBroken(boolean nib) {
  +        nioIsBroken = nib;
  +    }
  +
  +    public boolean getNioIsBroken() {
  +        return nioIsBroken;
  +    }
       
       /* ==================== ==================== */
       ServerSocket sSocket;
  @@ -278,7 +286,6 @@
       public void pause() throws Exception {
           synchronized(this) {
               paused = true;
  -            //unLockSocket();
           }
       }
   
  @@ -501,17 +508,6 @@
           
           if(log.isTraceEnabled() )
               log.trace("send() " + len + " " + buf[4] );
  -        if(buf[4] == HandlerRequest.JK_AJP13_END_RESPONSE ) {
  -            // After this goes out, the client may send a new request
  -            // before the thread finishes, so tell the Poller that the
  -            // next read is new
  -            Socket s = (Socket)ep.getNote(socketNote);
  -            SelectionKey key = s.getChannel().keyFor(selector);
  -            if(key != null) {
  -                SocketConnection sc = (SocketConnection)key.attachment();
  -                sc.setFinished();
  -            }
  -        }
   
           OutputStream os=(OutputStream)ep.getNote( osNote );
           os.write( buf, 0, len );
  @@ -528,8 +524,8 @@
       public int receive( Msg msg, MsgContext ep )
           throws IOException
       {
  -        if (log.isDebugEnabled()) {
  -            log.debug("receive() ");
  +        if (log.isTraceEnabled()) {
  +            log.trace("receive() ");
           }
   
           byte buf[]=msg.getBuffer();
  @@ -636,8 +632,6 @@
       /** Accept incoming connections, dispatch to the thread pool
        */
       void acceptConnections() {
  -        if( log.isDebugEnabled() )
  -            log.debug("Accepting ajp connections on " + port);
           if( running ) {
               try{
                   MsgContext ep=new MsgContext();
  @@ -674,8 +668,8 @@
               return flush( msg, ep );
           }
   
  -        if( log.isDebugEnabled() )
  -            log.debug("Call next " + type + " " + next);
  +        if( log.isTraceEnabled() )
  +            log.trace("Call next " + type + " " + next);
   
           // Send notification
           if( nSupport!=null ) {
  @@ -788,7 +782,6 @@
               this.ep=ep;
           }
   
  -
           public Object[] getInitData() {
               return null;
           }
  @@ -797,14 +790,13 @@
               if(!processConnection(ep)) {
                   unregister(ep);
               }
  -            setFinished();
           }
   
           public boolean isRunning() {
               return inProgress;
           }
   
  -        public synchronized void setFinished() {
  +        public  void setFinished() {
               inProgress = false;
           }
   
  @@ -812,26 +804,41 @@
            */
           boolean processConnection(MsgContext ep) {
               try {
  -                if( !running || paused ) {
  -                    return false;
  -                }
  -                int status= receive( recv, ep );
  -                if( status <= 0 ) {
  -                    if( status==-3)
  -                        log.debug( "server has been restarted or reset this connection" );
  -                    else 
  -                        log.warn("Closing ajp connection " + status );
  -                    return false;
  -                }
  -                ep.setLong( MsgContext.TIMER_RECEIVED, System.currentTimeMillis());
  -                
  -                ep.setType( 0 );
  -                // Will call next
  -                status= invoke( recv, ep );
  -                if( status != JkHandler.OK ) {
  -                    log.warn("processCallbacks status " + status );
  -                    return false;
  -                }
  +                InputStream sis = (InputStream)ep.getNote(isNote);
  +                boolean haveInput = true;
  +                while(haveInput) {
  +                    if( !running || paused ) {
  +                        return false;
  +                    }
  +                    int status= receive( recv, ep );
  +                    if( status <= 0 ) {
  +                        if( status==-3)
  +                            log.debug( "server has been restarted or reset this connection" );
  +                        else 
  +                            log.warn("Closing ajp connection " + status );
  +                        return false;
  +                    }
  +                    ep.setLong( MsgContext.TIMER_RECEIVED, System.currentTimeMillis());
  +                    
  +                    ep.setType( 0 );
  +                    // Will call next
  +                    status= invoke( recv, ep );
  +                    if( status != JkHandler.OK ) {
  +                        log.warn("processCallbacks status " + status );
  +                        return false;
  +                    }
  +                    synchronized(this) {
  +                        synchronized(sis) {
  +                            haveInput = sis.available() > 0;
  +                        }
  +                        if(!haveInput) {
  +                            setFinished();
  +                        } else {
  +                            if(log.isDebugEnabled())
  +                                log.debug("KeepAlive: "+sis.available());
  +                        }
  +                    }
  +                } 
               } catch( Exception ex ) {
                   String msg = ex.getMessage();
                   if( msg != null && msg.indexOf( "Connection reset" ) >= 0)
  @@ -845,20 +852,24 @@
               return true;
           }
   
  -        synchronized void process(SelectionKey sk) {
  +        synchronized void  process(SelectionKey sk) {
               if(!sk.isValid()) {
                   return;
               }
               if(sk.isReadable()) {
  +                SocketInputStream sis = (SocketInputStream)ep.getNote(isNote);
  +                boolean isok = sis.readAvailable();
                   if(!inProgress) {
  -                    inProgress = true;
  -                    tp.runIt(this);
  -                } else {
  -                    Object is = ep.getNote(isNote);
  -                    synchronized(is) {
  -                        is.notify();
  +                    if(isok) {
  +                        if(sis.available() > 0 || !nioIsBroken){
  +                            inProgress = true;
  +                            tp.runIt(this);
  +                        }
  +                    } else {
  +                        unregister(ep);
  +                        return;
                       }
  -                }
  +                } 
               }
               if(sk.isWritable()) {
                   Object os = ep.getNote(osNote);
  @@ -867,7 +878,6 @@
                   }
               }
           }
  -               
   
           synchronized void unregister(MsgContext ep) {
               try{
  @@ -913,11 +923,9 @@
           public void runIt(Object perTh[]) {
               while(running) {
                   try {
  -                    if(log.isTraceEnabled())
  -                        log.trace("Attempting to select "+selector.keys().size()+" items");
  -                    int ns = selector.select();
  -                    if(log.isTraceEnabled())
  -                        log.trace("Selecting "+ns+" channels");
  +                    int ns = selector.select(serverTimeout);
  +                    if(log.isDebugEnabled())
  +                        log.debug("Selecting "+ns+" channels");
                       if(ns > 0) {
                           Set sels = selector.selectedKeys();
                           Iterator it = sels.iterator();
  @@ -952,8 +960,11 @@
   
       protected class SocketInputStream extends InputStream {
           final int BUFFER_SIZE = 8200;
  -        ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
  -        SocketChannel channel;
  +        private ByteBuffer buffer = ByteBuffer.allocateDirect(BUFFER_SIZE);
  +        private SocketChannel channel;
  +        private boolean blocking = false;
  +        private boolean isClosed = false;
  +        private volatile boolean dataAvailable = false;
   
           SocketInputStream(SocketChannel channel) {
               this.channel = channel;
  @@ -976,37 +987,45 @@
               buffer.reset();
           }
   
  -        public int read() throws IOException {
  +        public synchronized int read() throws IOException {
               if(!checkAvailable(1)) {
  -                if(fill(1) < 0) {
  -                    return -1;
  -                }
  +                block(1);
               }
               return buffer.get();
           }
   
           private boolean checkAvailable(int nbyte) throws IOException {
  +            if(isClosed) {
  +                throw new ClosedChannelException();
  +            }
               return buffer.remaining() >=  nbyte;
           }
   
  -        private synchronized int fill(int nbyte) throws IOException {
  +        private int fill(int nbyte) throws IOException {
               int rem = nbyte;
               int read = 0;
               boolean eof = false;
  +            byte [] oldData = null;
  +            if(buffer.remaining() > 0) {
  +                // should rarely happen, so short-lived GC shouldn't hurt
  +                // as much as allocating a long-lived buffer for this
  +                if(log.isDebugEnabled())
  +                    log.debug("Saving old buffer: "+buffer.remaining());
  +                oldData = new byte[buffer.remaining()];
  +                buffer.get(oldData);
  +            }
               buffer.clear();
  +            if(oldData != null) {
  +                buffer.put(oldData);
  +            }
               while(rem > 0) {
                   int count = channel.read(buffer);
                   if(count < 0) {
                       eof = true;
                       break;
                   } else if(count == 0) {
  -                    if(log.isTraceEnabled()) 
  -                        log.trace("Blocking Read for "+rem+" bytes");
  -                    try {
  -                        wait();
  -                    }catch(InterruptedException iex) {
  -                        // ignore since can't happen
  -                    }
  +                    log.debug("Failed to recieve signaled read: ");
  +                    break;
                   }
                   read += count;
                   rem -= count;
  @@ -1015,31 +1034,82 @@
               return eof ? -1 : read;
           }
   
  +        synchronized boolean readAvailable() {
  +            if(blocking) {
  +                dataAvailable = true;
  +                notify();
  +            } else if(dataAvailable) {
  +                log.debug("Race Condition");
  +            } else {
  +                int nr=0;
  +
  +                try {
  +                    nr = fill(1);
  +                } catch(ClosedChannelException cce) {
  +                    log.debug("Channel is closed",cce);
  +                    nr = -1;
  +                } catch(IOException iex) {
  +                    log.warn("Exception processing read",iex);
  +                }
  +                if(nr < 0) {
  +                    isClosed = true;
  +                    notify();
  +                    return false;
  +                } else if(nr == 0) {
  +                    if(!nioIsBroken) {
  +                        dataAvailable = (buffer.remaining() <= 0);
  +                    }
  +                }
  +            }
  +            return true;
  +        }
  +
           public int read(byte [] data) throws IOException {
               return read(data, 0, data.length);
           }
   
  -        public int read(byte [] data, int offset, int len) throws IOException {
  +        public synchronized int read(byte [] data, int offset, int len) throws IOException {
               int olen = len;
  -            if(!checkAvailable(len)) {
  +            while(!checkAvailable(len)) {
                   int avail = buffer.remaining();
                   if(avail > 0) {
                       buffer.get(data, offset, avail);
                   }
                   len -= avail;
                   offset += avail;
  -                if(fill(len) < 0) {
  -                    return avail > 0 ? avail : -1;
  -                }
  +                block(len);
               }
               buffer.get(data, offset, len);
               return olen;
           }
  +
  +        private void block(int len) throws IOException {
  +            if(len <= 0) {
  +                return;
  +            }
  +            if(!dataAvailable) {
  +                blocking = true;
  +                if(log.isDebugEnabled())
  +                    log.debug("Waiting for "+len+" bytes to be available");
  +                try{
  +                    wait(socketTimeout);
  +                }catch(InterruptedException iex) {
  +                    log.debug("Interrupted",iex);
  +                }
  +                blocking = false;
  +            }
  +            if(dataAvailable) {
  +                dataAvailable = false;
  +                if(fill(len) < 0) {
  +                    isClosed = true;
  +                } 
  +            }
  +        }
       }
   
       protected class SocketOutputStream extends OutputStream {
           final int BUFFER_SIZE = 8200;
  -        ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
  +        ByteBuffer buffer = ByteBuffer.allocateDirect(BUFFER_SIZE);
           SocketChannel channel;
   
           SocketOutputStream(SocketChannel channel) {
  @@ -1064,27 +1134,30 @@
               buffer.put(data, offset, len);
           }
   
  -        public synchronized void flush() throws IOException {
  +        public void flush() throws IOException {
               buffer.flip();
               while(buffer.hasRemaining()) {
                   int count = channel.write(buffer);
                   if(count == 0) {
  -                    SelectionKey key = channel.keyFor(selector);
  -                    key.interestOps(SelectionKey.OP_WRITE);
  -                    log.debug("Blocking for channel write: "+buffer.remaining());
  -                    try {
  -                        wait();
  -                    } catch(InterruptedException iex) {
  -                        // ignore, since can't happen
  +                    synchronized(this) {
  +                        SelectionKey key = channel.keyFor(selector);
  +                        key.interestOps(SelectionKey.OP_WRITE);
  +                        if(log.isDebugEnabled())
  +                            log.debug("Blocking for channel write: "+buffer.remaining());
  +                        try {
  +                            wait();
  +                        } catch(InterruptedException iex) {
  +                            // ignore, since can't happen
  +                        }
  +                        key.interestOps(SelectionKey.OP_READ);
                       }
  -                    key.interestOps(SelectionKey.OP_READ);
                   }
               }
               buffer.clear();
           }
   
           private boolean checkAvailable(int len) {
  -            return buffer.remaining() > len;
  +            return buffer.remaining() >= len;
           }
       }
   
  
  
  

---------------------------------------------------------------------
To unsubscribe, e-mail: tomcat-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: tomcat-dev-help@jakarta.apache.org