You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by bi...@apache.org on 2005/04/24 06:27:42 UTC

cvs commit: jakarta-tomcat-connectors/jk/java/org/apache/jk/common ChannelNioSocket.java

billbarker    2005/04/23 21:27:42

  Modified:    jk/java/org/apache/jk/common ChannelNioSocket.java
  Log:
  Give up on switching between blocking/non-blocking Sockets, also move the Accecpt into the Poller instead of its own thread.
  
  This is still very much experimental, and nobody should even dream of using it in production.
  
  Testing on Windows, it's very flakey.  On Solaris, it's stable enough, but ChannelSocket is about 25% faster.
  
  Revision  Changes    Path
  1.2       +231 -80   jakarta-tomcat-connectors/jk/java/org/apache/jk/common/ChannelNioSocket.java
  
  Index: ChannelNioSocket.java
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-connectors/jk/java/org/apache/jk/common/ChannelNioSocket.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- ChannelNioSocket.java	17 Apr 2005 03:41:08 -0000	1.1
  +++ ChannelNioSocket.java	24 Apr 2005 04:27:42 -0000	1.2
  @@ -18,19 +18,22 @@
   
   import java.util.Set;
   import java.util.Iterator;
  -import java.io.BufferedInputStream;
  -import java.io.BufferedOutputStream;
   import java.io.IOException;
   import java.io.InputStream;
   import java.io.OutputStream;
  +import java.nio.ByteBuffer;
   import java.nio.channels.Selector;
   import java.nio.channels.SelectionKey;
  +import java.nio.channels.SocketChannel;
   import java.nio.channels.ClosedSelectorException;
  +import java.nio.channels.ServerSocketChannel;
  +import java.nio.channels.CancelledKeyException;
  +import java.nio.channels.ClosedChannelException;
   import java.net.URLEncoder;
   import java.net.InetAddress;
  +import java.net.InetSocketAddress;
   import java.net.ServerSocket;
   import java.net.Socket;
  -import java.net.SocketException;
   
   import javax.management.ListenerNotFoundException;
   import javax.management.MBeanNotificationInfo;
  @@ -92,10 +95,10 @@
       int maxPort=8019; // 0 for backward compat.
       int port=startPort;
       InetAddress inet;
  -    int serverTimeout;
  +    int serverTimeout = 0;
       boolean tcpNoDelay=true; // nodelay to true by default
       int linger=100;
  -    int socketTimeout;
  +    int socketTimeout = 0;
       private Selector selector = null;
   
       long requestCount=0;
  @@ -105,7 +108,6 @@
          flush() is honored ( on my test, I got 367->433 RPS and
          52->35ms average time with a simple servlet )
       */
  -    static final boolean BUFFER_WRITE=false;
       
       ThreadPool tp=ThreadPool.createThreadPool(true);
   
  @@ -271,12 +273,12 @@
       final int isNote=2;
       final int osNote=3;
       final int notifNote=4;
  -    boolean paused = true;
  +    boolean paused = false;
   
       public void pause() throws Exception {
           synchronized(this) {
               paused = true;
  -            unLockSocket();
  +            //unLockSocket();
           }
       }
   
  @@ -299,10 +301,11 @@
                   }
               }
           }
  -        Socket s=sSocket.accept();
  +        SocketChannel sc=sSocket.getChannel().accept();
  +        Socket s = sc.socket();
           ep.setNote( socketNote, s );
           if(log.isDebugEnabled() )
  -            log.debug("Accepted socket " + s );
  +            log.debug("Accepted socket " + s +" channel "  + sc.isBlocking());
           if( linger > 0 )
               s.setSoLinger( true, linger);
           if( socketTimeout > 0 ) 
  @@ -312,12 +315,9 @@
           
           requestCount++;
   
  -        InputStream is=new BufferedInputStream(s.getInputStream());
  -        OutputStream os;
  -        if( BUFFER_WRITE )
  -            os = new BufferedOutputStream( s.getOutputStream());
  -        else
  -            os = s.getOutputStream();
  +        sc.configureBlocking(false);
  +        InputStream is=new SocketInputStream(sc);
  +        OutputStream os = new SocketOutputStream(sc);
           ep.setNote( isNote, is );
           ep.setNote( osNote, os );
           ep.setControl( tp );
  @@ -349,19 +349,24 @@
           }
           if (maxPort < startPort)
               maxPort = startPort;
  +        ServerSocketChannel ssc = ServerSocketChannel.open();
  +        ssc.configureBlocking(false);
           for( int i=startPort; i<=maxPort; i++ ) {
               try {
  +                InetSocketAddress iddr = null;
                   if( inet == null ) {
  -                    sSocket = new ServerSocket( i, 0 );
  +                    iddr = new InetSocketAddress( i);
                   } else {
  -                    sSocket=new ServerSocket( i, 0, inet );
  +                    iddr=new InetSocketAddress( inet, i);
                   }
  +                sSocket = ssc.socket();
  +                sSocket.bind(iddr);
                   port=i;
                   break;
               } catch( IOException ex ) {
                   if(log.isInfoEnabled())
                       log.info("Port busy " + i + " " + ex.toString());
  -                continue;
  +                sSocket = null;
               }
           }
   
  @@ -373,6 +378,7 @@
               log.info("JK: ajp13 listening on " + getAddress() + ":" + port );
   
           selector = Selector.open();
  +        ssc.register(selector, SelectionKey.OP_ACCEPT);
           // If this is not the base port and we are the 'main' channleSocket and
           // SHM didn't already set the localId - we'll set the instance id
           if( "channelNioSocket".equals( name ) &&
  @@ -380,8 +386,6 @@
               (wEnv.getLocalId()==0) ) {
               wEnv.setLocalId(  port - startPort );
           }
  -        if( serverTimeout > 0 )
  -            sSocket.setSoTimeout( serverTimeout );
   
           // XXX Reverse it -> this is a notification generator !!
           if( next==null && wEnv!=null ) {
  @@ -415,9 +419,8 @@
           }
   
           tp.start();
  -        SocketAcceptor acceptAjp=new SocketAcceptor(   );
  -        tp.runIt( acceptAjp);
  -
  +        Poller pollAjp = new Poller();
  +        tp.runIt(pollAjp);
       }
   
       ObjectName tpOName;
  @@ -428,8 +431,6 @@
       public void start() throws IOException{
           if( sSocket==null )
               init();
  -        Poller pollAjp = new Poller();
  -        tp.runIt(pollAjp);
           resume();
       }
   
  @@ -460,24 +461,10 @@
       
       public void close(MsgContext ep) throws IOException {
           Socket s=(Socket)ep.getNote( socketNote );
  -        s.close();
  -    }
  -
  -    private void unLockSocket() throws IOException {
  -        // Need to create a connection to unlock the accept();
  -        Socket s;
  -        InetAddress ladr = inet;
  -
  -        if(port == 0)
  -            return;
  -        if (ladr == null || "0.0.0.0".equals(ladr.getHostAddress())) {
  -            ladr = InetAddress.getLocalHost();
  +        SelectionKey key = s.getChannel().keyFor(selector);
  +        if(key != null) {
  +            key.cancel();
           }
  -        s=new Socket(ladr, port );
  -        // setting soLinger to a small value will help shutdown the
  -        // connection quicker
  -        s.setSoLinger(true, 0);
  -
           s.close();
       }
   
  @@ -489,12 +476,8 @@
                   return;
               tp.shutdown();
   
  -            if(!paused) {
  -                unLockSocket();
  -            }
  -
  +            selector.wakeup().close();
               sSocket.close(); // XXX?
  -            selector.close();
               
               if( tpOName != null )  {
                   Registry.getRegistry(null, null).unregisterComponent(tpOName);
  @@ -518,19 +501,27 @@
           
           if(log.isTraceEnabled() )
               log.trace("send() " + len + " " + buf[4] );
  +        if(buf[4] == HandlerRequest.JK_AJP13_END_RESPONSE ) {
  +            // After this goes out, the client may send a new request
  +            // before the thread finishes, so tell the Poller that the
  +            // next read is new
  +            Socket s = (Socket)ep.getNote(socketNote);
  +            SelectionKey key = s.getChannel().keyFor(selector);
  +            if(key != null) {
  +                SocketConnection sc = (SocketConnection)key.attachment();
  +                sc.setFinished();
  +            }
  +        }
   
           OutputStream os=(OutputStream)ep.getNote( osNote );
           os.write( buf, 0, len );
  +        os.flush();
           return len;
       }
   
       public int flush( Msg msg, MsgContext ep)
           throws IOException
       {
  -        if( BUFFER_WRITE ) {
  -            OutputStream os=(OutputStream)ep.getNote( osNote );
  -            os.flush();
  -        }
           return 0;
       }
   
  @@ -613,7 +604,7 @@
           while(pos < len) {
               try {
                   got = is.read(b, pos + offset, len - pos);
  -            } catch(SocketException sex) {
  +            } catch(ClosedChannelException sex) {
                   if(pos > 0) {
                       log.info("Error reading data after "+pos+"bytes",sex);
                   } else {
  @@ -647,20 +638,20 @@
       void acceptConnections() {
           if( log.isDebugEnabled() )
               log.debug("Accepting ajp connections on " + port);
  -        while( running ) {
  +        if( running ) {
               try{
                   MsgContext ep=new MsgContext();
                   ep.setSource(this);
                   ep.setWorkerEnv( wEnv );
                   this.accept(ep);
   
  -                if( !running ) break;
  +                if( !running ) return;
                   
                   // Since this is a long-running connection, we don't care
                   // about the small GC
                   SocketConnection ajpConn=
                       new SocketConnection( ep);
  -                tp.runIt( ajpConn );
  +                ajpConn.register(ep);
               }catch(Exception ex) {
                   if (running)
                       log.warn("Exception executing accept" ,ex);
  @@ -788,21 +779,6 @@
           return notifInfo;
       }
   
  -    protected class SocketAcceptor implements ThreadPoolRunnable {
  -
  -    
  -        SocketAcceptor( ) {
  -        }
  -
  -        public Object[] getInitData() {
  -            return null;
  -        }
  -
  -        public void runIt(Object thD[]) {
  -            acceptConnections();
  -        }
  -    }
  -
       protected class SocketConnection implements ThreadPoolRunnable {
           MsgContext ep;
           MsgAjp recv = new MsgAjp();
  @@ -818,19 +794,20 @@
           }
       
           public void runIt(Object perTh[]) {
  -            inProgress = true;
  -            if(processConnection(ep)) {
  -                register(ep);
  -            } else {
  +            if(!processConnection(ep)) {
                   unregister(ep);
               }
  -            inProgress = false;
  +            setFinished();
           }
   
           public boolean isRunning() {
               return inProgress;
           }
   
  +        public synchronized void setFinished() {
  +            inProgress = false;
  +        }
  +
           /** Process a single ajp connection.
            */
           boolean processConnection(MsgContext ep) {
  @@ -868,7 +845,31 @@
               return true;
           }
   
  -        void unregister(MsgContext ep) {
  +        synchronized void process(SelectionKey sk) {
  +            if(!sk.isValid()) {
  +                return;
  +            }
  +            if(sk.isReadable()) {
  +                if(!inProgress) {
  +                    inProgress = true;
  +                    tp.runIt(this);
  +                } else {
  +                    Object is = ep.getNote(isNote);
  +                    synchronized(is) {
  +                        is.notify();
  +                    }
  +                }
  +            }
  +            if(sk.isWritable()) {
  +                Object os = ep.getNote(osNote);
  +                synchronized(os) {
  +                    os.notify();
  +                }
  +            }
  +        }
  +               
  +
  +        synchronized void unregister(MsgContext ep) {
               try{
                   close(ep);
               } catch(Exception e) {
  @@ -912,6 +913,8 @@
           public void runIt(Object perTh[]) {
               while(running) {
                   try {
  +                    if(log.isTraceEnabled())
  +                        log.trace("Attempting to select "+selector.keys().size()+" items");
                       int ns = selector.select();
                       if(log.isTraceEnabled())
                           log.trace("Selecting "+ns+" channels");
  @@ -920,22 +923,170 @@
                           Iterator it = sels.iterator();
                           while(it.hasNext()) {
                               SelectionKey sk = (SelectionKey)it.next();
  -                            SocketConnection sc = (SocketConnection)sk.attachment();
                               if(sk.isValid()) {
  -                                sk.cancel(); // somebody else's problem now
  -                                tp.runIt(sc);
  +                                if(sk.isAcceptable()) {
  +                                    acceptConnections();
  +                                } else {
  +                                    SocketConnection sc = (SocketConnection)sk.attachment();
  +                                    sc.process(sk);
  +                                }
  +                            } else {
  +                                sk.cancel();
                               }
  +                            it.remove();
                           }
                       }
                   } catch(ClosedSelectorException cse) {
                       log.debug("Selector is closed");
                       return;
  +                } catch(CancelledKeyException cke) {
  +                    log.debug("Key Cancelled", cke);
                   } catch(IOException iex) {
                       log.warn("IO Error in select",iex);
  +                } catch(Exception ex) {
  +                    log.warn("Error processing select",ex);
                   }
               }
           }
       }
   
  +    protected class SocketInputStream extends InputStream {
  +        final int BUFFER_SIZE = 8200;
  +        ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
  +        SocketChannel channel;
  +
  +        SocketInputStream(SocketChannel channel) {
  +            this.channel = channel;
  +            buffer.limit(0);
  +        }
  +
  +        public int available() {
  +            return buffer.remaining();
  +        }
  +
  +        public void mark(int readlimit) {
  +            buffer.mark();
  +        }
  +
  +        public boolean markSupported() {
  +            return true;
  +        }
  +
  +        public void reset() {
  +            buffer.reset();
  +        }
  +
  +        public int read() throws IOException {
  +            if(!checkAvailable(1)) {
  +                if(fill(1) < 0) {
  +                    return -1;
  +                }
  +            }
  +            return buffer.get();
  +        }
  +
  +        private boolean checkAvailable(int nbyte) throws IOException {
  +            return buffer.remaining() >=  nbyte;
  +        }
  +
  +        private synchronized int fill(int nbyte) throws IOException {
  +            int rem = nbyte;
  +            int read = 0;
  +            boolean eof = false;
  +            buffer.clear();
  +            while(rem > 0) {
  +                int count = channel.read(buffer);
  +                if(count < 0) {
  +                    eof = true;
  +                    break;
  +                } else if(count == 0) {
  +                    if(log.isTraceEnabled()) 
  +                        log.trace("Blocking Read for "+rem+" bytes");
  +                    try {
  +                        wait();
  +                    }catch(InterruptedException iex) {
  +                        // ignore since can't happen
  +                    }
  +                }
  +                read += count;
  +                rem -= count;
  +            }
  +            buffer.flip();
  +            return eof ? -1 : read;
  +        }
  +
  +        public int read(byte [] data) throws IOException {
  +            return read(data, 0, data.length);
  +        }
  +
  +        public int read(byte [] data, int offset, int len) throws IOException {
  +            int olen = len;
  +            if(!checkAvailable(len)) {
  +                int avail = buffer.remaining();
  +                if(avail > 0) {
  +                    buffer.get(data, offset, avail);
  +                }
  +                len -= avail;
  +                offset += avail;
  +                if(fill(len) < 0) {
  +                    return avail > 0 ? avail : -1;
  +                }
  +            }
  +            buffer.get(data, offset, len);
  +            return olen;
  +        }
  +    }
  +
  +    protected class SocketOutputStream extends OutputStream {
  +        final int BUFFER_SIZE = 8200;
  +        ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
  +        SocketChannel channel;
  +
  +        SocketOutputStream(SocketChannel channel) {
  +            this.channel = channel;
  +        }
  +
  +        public void write(int b) throws IOException {
  +            if(!checkAvailable(1)) {
  +                flush();
  +            }
  +            buffer.put((byte)b);
  +        }
  +
  +        public void write(byte [] data) throws IOException {
  +            write(data, 0, data.length);
  +        }
  +
  +        public void write(byte [] data, int offset, int len) throws IOException {
  +            if(!checkAvailable(len)) {
  +                flush();
  +            }
  +            buffer.put(data, offset, len);
  +        }
  +
  +        public synchronized void flush() throws IOException {
  +            buffer.flip();
  +            while(buffer.hasRemaining()) {
  +                int count = channel.write(buffer);
  +                if(count == 0) {
  +                    SelectionKey key = channel.keyFor(selector);
  +                    key.interestOps(SelectionKey.OP_WRITE);
  +                    log.debug("Blocking for channel write: "+buffer.remaining());
  +                    try {
  +                        wait();
  +                    } catch(InterruptedException iex) {
  +                        // ignore, since can't happen
  +                    }
  +                    key.interestOps(SelectionKey.OP_READ);
  +                }
  +            }
  +            buffer.clear();
  +        }
  +
  +        private boolean checkAvailable(int len) {
  +            return buffer.remaining() > len;
  +        }
  +    }
  +
   }
   
  
  
  

---------------------------------------------------------------------
To unsubscribe, e-mail: tomcat-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: tomcat-dev-help@jakarta.apache.org


Re: cvs commit: jakarta-tomcat-connectors/jk/java/org/apache/jk/common ChannelNioSocket.java

Posted by Remy Maucherat <re...@apache.org>.
Bill Barker wrote:
>> Does it do about the same of behavior as the APR endpoint ? (I didn't 
>> try it - I run Windoze for my dev, BTW ;) )
> 
> ChannelNioSocket works ok on Windows on low concurrency, but what's the 
> point? ;-).  After that, NIO starts throwing weird NPEs.
> 
> I haven't tried the APR endpoint on Solaris yet.  If I get some time, 
> I'll try compiling it and give it a spin.  However the NIO tests suggest 
> that PoolTcpEndpoint will win. There is just not much downside to simply 
> increasing maxThreads on Solaris, but the additional syncs and 
> context-switching do cost.

That's a good question. On Windows, raw throughput (ab on localhost, I 
suck ...) is slower, but any kind of real test seem more or less 
equivalent, with sendfile bringing in gains. I don't know if this counts 
as real testing (probably not).
Linux is faster according to Mladen.
Solaris is untested, but would be interesting indeed.

>> I would like AJP and HTTP to share more code. Do you think that NIO is 
>> a better choice, or should it remain an experiment ? To me, it seems 
>> less mature, robust and portable (the most annoying problem probably 
>> being that to get bugfixes and feature updates, you need to upgrade 
>> the JVM), but I don't have a whole lot of experience.
> 
> Most of the split was because Costin designed Jk-Coyote around the JNI 
> stuff. Also, at the time PoolTcpEndpoint didn't support the master-slave 
> model he settled on.
> 
> Actually, I'm thinking of leaving ChannelNioSocket in mostly to have 
> something to point to when people show up on the list asking "why 
> doesn't Tomcat use this great NIO stuff?". ;-)  The non-blocking io 
> actually works well for AJP (since all of the reads and writes are at 
> most 8K).  Using ByteBuffer vs byte [] seems mostly a matter of taste 
> :).  Otherwise, I agree that NIO should probably remain an experiment 
> for now.

Of course, it's good comparison, especially since it was done quickly.

Rémy

---------------------------------------------------------------------
To unsubscribe, e-mail: tomcat-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: tomcat-dev-help@jakarta.apache.org


Re: cvs commit: jakarta-tomcat-connectors/jk/java/org/apache/jk/common ChannelNioSocket.java

Posted by Bill Barker <wb...@wilshire.com>.
----- Original Message ----- 
From: "Remy Maucherat" <re...@apache.org>
To: "Tomcat Developers List" <to...@jakarta.apache.org>
Sent: Sunday, April 24, 2005 1:49 AM
Subject: Re: cvs commit: 
jakarta-tomcat-connectors/jk/java/org/apache/jk/common ChannelNioSocket.java


>billbarker@apache.org wrote:
>> billbarker    2005/04/23 21:27:42
>>
>> Modified:    jk/java/org/apache/jk/common ChannelNioSocket.java Log: Give 
>> up on switching between blocking/non-blocking Sockets, also move
>> the Accecpt into the Poller instead of its own thread.
>>
>> This is still very much experimental, and nobody should even dream of
>> using it in production.
>>
>> Testing on Windows, it's very flakey.  On Solaris, it's stable
>> enough, but ChannelSocket is about 25% faster.
>
>Does it do about the same of behavior as the APR endpoint ? (I didn't try 
>it - I run Windoze for my dev, BTW ;) )
>

ChannelNioSocket works ok on Windows on low concurrency, but what's the 
point? ;-).  After that, NIO starts throwing weird NPEs.

I haven't tried the APR endpoint on Solaris yet.  If I get some time, I'll 
try compiling it and give it a spin.  However the NIO tests suggest that 
PoolTcpEndpoint will win. There is just not much downside to simply 
increasing maxThreads on Solaris, but the additional syncs and 
context-switching do cost.

>I would like AJP and HTTP to share more code. Do you think that NIO is a 
>better choice, or should it remain an experiment ? To me, it seems less 
>mature, robust and portable (the most annoying problem probably being that 
>to get bugfixes and feature updates, you need to upgrade the JVM), but I 
>don't have a whole lot of experience.
>

Most of the split was because Costin designed Jk-Coyote around the JNI 
stuff. Also, at the time PoolTcpEndpoint didn't support the master-slave 
model he settled on.

Actually, I'm thinking of leaving ChannelNioSocket in mostly to have 
something to point to when people show up on the list asking "why doesn't 
Tomcat use this great NIO stuff?". ;-)  The non-blocking io actually works 
well for AJP (since all of the reads and writes are at most 8K).  Using 
ByteBuffer vs byte [] seems mostly a matter of taste :).  Otherwise, I agree 
that NIO should probably remain an experiment for now.

>Rémy
>
>---------------------------------------------------------------------
>To unsubscribe, e-mail: tomcat-dev-unsubscribe@jakarta.apache.org
>For additional commands, e-mail: tomcat-dev-help@jakarta.apache.org
>




This message is intended only for the use of the person(s) listed above as the intended recipient(s), and may contain information that is PRIVILEGED and CONFIDENTIAL.  If you are not an intended recipient, you may not read, copy, or distribute this message or any attachment. If you received this communication in error, please notify us immediately by e-mail and then delete all copies of this message and any attachments.

In addition you should be aware that ordinary (unencrypted) e-mail sent through the Internet is not secure. Do not send confidential or sensitive information, such as social security numbers, account numbers, personal identification numbers and passwords, to us via ordinary (unencrypted) e-mail.



Re: cvs commit: jakarta-tomcat-connectors/jk/java/org/apache/jk/common ChannelNioSocket.java

Posted by Remy Maucherat <re...@apache.org>.
billbarker@apache.org wrote:
> billbarker    2005/04/23 21:27:42
> 
> Modified:    jk/java/org/apache/jk/common ChannelNioSocket.java Log: 
> Give up on switching between blocking/non-blocking Sockets, also move
> the Accecpt into the Poller instead of its own thread.
> 
> This is still very much experimental, and nobody should even dream of
> using it in production.
> 
> Testing on Windows, it's very flakey.  On Solaris, it's stable
> enough, but ChannelSocket is about 25% faster.

Does it do about the same of behavior as the APR endpoint ? (I didn't 
try it - I run Windoze for my dev, BTW ;) )

I would like AJP and HTTP to share more code. Do you think that NIO is a 
better choice, or should it remain an experiment ? To me, it seems less 
mature, robust and portable (the most annoying problem probably being 
that to get bugfixes and feature updates, you need to upgrade the JVM), 
but I don't have a whole lot of experience.

Rémy

---------------------------------------------------------------------
To unsubscribe, e-mail: tomcat-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: tomcat-dev-help@jakarta.apache.org