You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by re...@apache.org on 2014/03/10 12:27:12 UTC

svn commit: r1575905 [1/4] - in /tomcat/trunk: ./ java/org/apache/coyote/ajp/ java/org/apache/coyote/http11/ java/org/apache/coyote/http11/upgrade/ java/org/apache/tomcat/util/net/ webapps/docs/ webapps/docs/config/

Author: remm
Date: Mon Mar 10 11:27:11 2014
New Revision: 1575905

URL: http://svn.apache.org/r1575905
Log:
Add experimental NIO2 connector. Based on code developed by Nabil Benothman.

Added:
    tomcat/trunk/java/org/apache/coyote/ajp/AjpNio2Processor.java
    tomcat/trunk/java/org/apache/coyote/ajp/AjpNio2Protocol.java
    tomcat/trunk/java/org/apache/coyote/http11/Http11Nio2Processor.java
    tomcat/trunk/java/org/apache/coyote/http11/Http11Nio2Protocol.java
    tomcat/trunk/java/org/apache/coyote/http11/InternalNio2InputBuffer.java
    tomcat/trunk/java/org/apache/coyote/http11/InternalNio2OutputBuffer.java
    tomcat/trunk/java/org/apache/coyote/http11/upgrade/Nio2Processor.java
    tomcat/trunk/java/org/apache/coyote/http11/upgrade/Nio2ServletInputStream.java
    tomcat/trunk/java/org/apache/coyote/http11/upgrade/Nio2ServletOutputStream.java
    tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Channel.java
    tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
    tomcat/trunk/java/org/apache/tomcat/util/net/SecureNio2Channel.java
Modified:
    tomcat/trunk/build.xml
    tomcat/trunk/java/org/apache/coyote/http11/LocalStrings.properties
    tomcat/trunk/java/org/apache/tomcat/util/net/SocketProperties.java
    tomcat/trunk/webapps/docs/changelog.xml
    tomcat/trunk/webapps/docs/config/ajp.xml
    tomcat/trunk/webapps/docs/config/http.xml

Modified: tomcat/trunk/build.xml
URL: http://svn.apache.org/viewvc/tomcat/trunk/build.xml?rev=1575905&r1=1575904&r2=1575905&view=diff
==============================================================================
--- tomcat/trunk/build.xml (original)
+++ tomcat/trunk/build.xml Mon Mar 10 11:27:11 2014
@@ -1317,7 +1317,7 @@
   <property name="junit.formatter.extension" value=".txt" />
 
   <target name="test" description="Runs the JUnit test cases"
-          depends="test-bio,test-nio,test-apr,cobertura-report" >
+          depends="test-bio,test-nio,test-nio2,test-apr,cobertura-report" >
     <fail if="test.result.error" message='Some tests completed with an Error. See ${tomcat.build}/logs for details, search for "FAILED".' />
     <fail if="test.result.failure" message='Some tests completed with a Failure. See ${tomcat.build}/logs for details, search for "FAILED".' />
   </target>
@@ -1334,6 +1334,12 @@
               extension=".NIO" />
   </target>
 
+  <target name="test-nio2" description="Runs the JUnit test cases for NIO2. Does not stop on errors."
+	     depends="test-compile,deploy,cobertura-instrument" if="${execute.test.nio2}">
+    <runtests protocol="org.apache.coyote.http11.Http11Nio2Protocol"
+	          extension=".NIO2" />
+  </target>
+
   <target name="test-apr" description="Runs the JUnit test cases for APR. Does not stop on errors."
           depends="test-compile,deploy,test-apr-exists,cobertura-instrument"
           if="${apr.exists}">
@@ -1451,7 +1457,7 @@
   </target>
 
   <target name="cobertura-report" if="${test.cobertura}"
-          depends="test-bio,test-nio,test-apr"
+          depends="test-bio,test-nio,test-nio2,test-apr"
           description="Creates report from gathered Cobertura results">
 
     <cobertura-report srcdir="${basedir}/java" destdir="${cobertura.out}"

Added: tomcat/trunk/java/org/apache/coyote/ajp/AjpNio2Processor.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AjpNio2Processor.java?rev=1575905&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/ajp/AjpNio2Processor.java (added)
+++ tomcat/trunk/java/org/apache/coyote/ajp/AjpNio2Processor.java Mon Mar 10 11:27:11 2014
@@ -0,0 +1,269 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.coyote.ajp;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.CompletionHandler;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.juli.logging.Log;
+import org.apache.juli.logging.LogFactory;
+import org.apache.tomcat.util.net.Nio2Channel;
+import org.apache.tomcat.util.net.Nio2Endpoint;
+import org.apache.tomcat.util.net.SocketStatus;
+import org.apache.tomcat.util.net.SocketWrapper;
+
+/**
+ * Processes AJP requests using NIO2.
+ */
+public class AjpNio2Processor extends AbstractAjpProcessor<Nio2Channel> {
+
+    private static final Log log = LogFactory.getLog(AjpNio2Processor.class);
+    @Override
+    protected Log getLog() {
+        return log;
+    }
+
+    /**
+     * The completion handler used for asynchronous write operations
+     */
+    protected CompletionHandler<Integer, SocketWrapper<Nio2Channel>> writeCompletionHandler;
+
+    /**
+     * Flipped flag for read buffer.
+     */
+    protected boolean flipped = false;
+
+    /**
+     * Write pending flag.
+     */
+    protected volatile boolean writePending = false;
+
+    public AjpNio2Processor(int packetSize, Nio2Endpoint endpoint0) {
+        super(packetSize, endpoint0);
+        response.setOutputBuffer(new SocketOutputBuffer());
+        this.writeCompletionHandler = new CompletionHandler<Integer, SocketWrapper<Nio2Channel>>() {
+            @Override
+            public void completed(Integer nBytes, SocketWrapper<Nio2Channel> attachment) {
+                boolean notify = false;
+                synchronized (writeCompletionHandler) {
+                    if (nBytes < 0) {
+                        failed(new IOException(sm.getString("ajpprocessor.failedsend")), attachment);
+                        return;
+                    }
+                    writePending = false;
+                    if (!Nio2Endpoint.isInline()) {
+                        notify = true;
+                    }
+                }
+                if (notify) {
+                    endpoint.processSocket(attachment, SocketStatus.OPEN_WRITE, false);
+                }
+            }
+            @Override
+            public void failed(Throwable exc, SocketWrapper<Nio2Channel> attachment) {
+                attachment.setError(true);
+                writePending = false;
+                endpoint.processSocket(attachment, SocketStatus.DISCONNECT, true);
+            }
+        };
+    }
+
+    @Override
+    public void recycle(boolean socketClosing) {
+        super.recycle(socketClosing);
+        writePending = false;
+        flipped = false;
+    }
+
+    @Override
+    protected void registerForEvent(boolean read, boolean write) {
+        final Nio2Endpoint.Nio2SocketWrapper attach =
+                (Nio2Endpoint.Nio2SocketWrapper) socketWrapper;
+        if (attach == null) {
+            return;
+        }
+        if (read) {
+            attach.interestOps(attach.interestOps() | Nio2Endpoint.OP_READ);
+        }
+        if (write) {
+            attach.interestOps(attach.interestOps() | Nio2Endpoint.OP_WRITE);
+        }
+    }
+
+    @Override
+    protected void resetTimeouts() {
+        // The NIO connector uses the timeout configured on the wrapper in the
+        // poller. Therefore, it needs to be reset once asycn processing has
+        // finished.
+        if (!error && socketWrapper != null &&
+                asyncStateMachine.isAsyncDispatching()) {
+            long soTimeout = endpoint.getSoTimeout();
+
+            //reset the timeout
+            if (keepAliveTimeout > 0) {
+                socketWrapper.setTimeout(keepAliveTimeout);
+            } else {
+                socketWrapper.setTimeout(soTimeout);
+            }
+        }
+
+    }
+
+
+    @Override
+    protected void setupSocket(SocketWrapper<Nio2Channel> socketWrapper)
+            throws IOException {
+        // NO-OP
+    }
+
+
+    @Override
+    protected void setTimeout(SocketWrapper<Nio2Channel> socketWrapper,
+            int timeout) throws IOException {
+        socketWrapper.setTimeout(timeout);
+    }
+
+
+    @Override
+    protected int output(byte[] src, int offset, int length, boolean block)
+            throws IOException {
+
+        if (socketWrapper == null || socketWrapper.getSocket() == null)
+            return -1;
+
+        ByteBuffer writeBuffer =
+                socketWrapper.getSocket().getBufHandler().getWriteBuffer();
+
+        int result = 0;
+        if (block) {
+            writeBuffer.clear();
+            writeBuffer.put(src, offset, length);
+            writeBuffer.flip();
+            try {
+                result = socketWrapper.getSocket().write(writeBuffer).get(socketWrapper.getTimeout(), TimeUnit.MILLISECONDS);
+            } catch (InterruptedException | ExecutionException
+                    | TimeoutException e) {
+                throw new IOException(sm.getString("ajpprocessor.failedsend"), e);
+            }
+        } else {
+            synchronized (writeCompletionHandler) {
+                if (!writePending) {
+                    writeBuffer.clear();
+                    writeBuffer.put(src, offset, length);
+                    writeBuffer.flip();
+                    writePending = true;
+                    Nio2Endpoint.startInline();
+                    socketWrapper.getSocket().write(writeBuffer, socketWrapper.getTimeout(),
+                            TimeUnit.MILLISECONDS, socketWrapper, writeCompletionHandler);
+                    Nio2Endpoint.endInline();
+                    result = length;
+                }
+            }
+        }
+        return result;
+    }
+
+
+    @Override
+    protected boolean read(byte[] buf, int pos, int n, boolean blockFirstRead)
+        throws IOException {
+
+        int read = 0;
+        int res = 0;
+        boolean block = blockFirstRead;
+
+        while (read < n) {
+            res = readSocket(buf, read + pos, n - read, block);
+            if (res > 0) {
+                read += res;
+            } else if (res == 0 && !block) {
+                return false;
+            } else {
+                throw new IOException(sm.getString("ajpprocessor.failedread"));
+            }
+            block = true;
+        }
+        return true;
+    }
+
+
+    private int readSocket(byte[] buf, int pos, int n, boolean block)
+            throws IOException {
+        int nRead = 0;
+        ByteBuffer readBuffer =
+                socketWrapper.getSocket().getBufHandler().getReadBuffer();
+
+        if (block) {
+            if (!flipped) {
+                readBuffer.flip();
+                flipped = true;
+            }
+            if (readBuffer.remaining() > 0) {
+                nRead = Math.min(n, readBuffer.remaining());
+                readBuffer.get(buf, pos, nRead);
+                if (readBuffer.remaining() == 0) {
+                    readBuffer.clear();
+                    flipped = false;
+                }
+            } else {
+                readBuffer.clear();
+                flipped = false;
+                readBuffer.limit(n);
+                try {
+                    nRead = socketWrapper.getSocket().read(readBuffer).get(socketWrapper.getTimeout(), TimeUnit.MILLISECONDS);
+                } catch (InterruptedException | ExecutionException
+                        | TimeoutException e) {
+                    throw new IOException(sm.getString("ajpprocessor.failedread"), e);
+                }
+                if (nRead > 0) {
+                    if (!flipped) {
+                        readBuffer.flip();
+                        flipped = true;
+                    }
+                    nRead = Math.min(n, readBuffer.remaining());
+                    readBuffer.get(buf, pos, nRead);
+                    if (readBuffer.remaining() == 0) {
+                        readBuffer.clear();
+                        flipped = false;
+                    }
+                }
+            }
+        } else {
+            if (!flipped) {
+                readBuffer.flip();
+                flipped = true;
+            }
+            if (readBuffer.remaining() > 0) {
+                nRead = Math.min(n, readBuffer.remaining());
+                readBuffer.get(buf, pos, nRead);
+                if (readBuffer.remaining() == 0) {
+                    readBuffer.clear();
+                    flipped = false;
+                }
+            } else {
+                readBuffer.clear();
+                flipped = false;
+                readBuffer.limit(n);
+            }
+        }
+        return nRead;
+    }
+}

Added: tomcat/trunk/java/org/apache/coyote/ajp/AjpNio2Protocol.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AjpNio2Protocol.java?rev=1575905&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/ajp/AjpNio2Protocol.java (added)
+++ tomcat/trunk/java/org/apache/coyote/ajp/AjpNio2Protocol.java Mon Mar 10 11:27:11 2014
@@ -0,0 +1,158 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.coyote.ajp;
+
+import javax.net.ssl.SSLEngine;
+
+import org.apache.coyote.AbstractProtocol;
+import org.apache.coyote.Processor;
+import org.apache.juli.logging.Log;
+import org.apache.juli.logging.LogFactory;
+import org.apache.tomcat.util.net.AbstractEndpoint;
+import org.apache.tomcat.util.net.Nio2Channel;
+import org.apache.tomcat.util.net.Nio2Endpoint;
+import org.apache.tomcat.util.net.Nio2Endpoint.Handler;
+import org.apache.tomcat.util.net.SSLImplementation;
+import org.apache.tomcat.util.net.SocketWrapper;
+
+
+/**
+ * Abstract the protocol implementation, including threading, etc.
+ * Processor is single threaded and specific to stream-based protocols,
+ * will not fit Jk protocols like JNI.
+ */
+public class AjpNio2Protocol extends AbstractAjpProtocol<Nio2Channel> {
+
+
+    private static final Log log = LogFactory.getLog(AjpNio2Protocol.class);
+
+    @Override
+    protected Log getLog() { return log; }
+
+
+    @Override
+    protected AbstractEndpoint.Handler getHandler() {
+        return cHandler;
+    }
+
+
+    // ------------------------------------------------------------ Constructor
+
+
+    public AjpNio2Protocol() {
+        endpoint = new Nio2Endpoint();
+        cHandler = new AjpConnectionHandler(this);
+        ((Nio2Endpoint) endpoint).setHandler(cHandler);
+        setSoLinger(Constants.DEFAULT_CONNECTION_LINGER);
+        setSoTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT);
+        setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY);
+        // AJP does not use Send File
+        ((Nio2Endpoint) endpoint).setUseSendfile(false);
+    }
+
+
+    // ----------------------------------------------------- Instance Variables
+
+
+    /**
+     * Connection handler for AJP.
+     */
+    private final AjpConnectionHandler cHandler;
+
+
+    // ----------------------------------------------------- JMX related methods
+
+    @Override
+    protected String getNamePrefix() {
+        return ("ajp-nio2");
+    }
+
+
+    // --------------------------------------  AjpConnectionHandler Inner Class
+
+
+    protected static class AjpConnectionHandler
+            extends AbstractAjpConnectionHandler<Nio2Channel, AjpNio2Processor>
+            implements Handler {
+
+        protected final AjpNio2Protocol proto;
+
+        public AjpConnectionHandler(AjpNio2Protocol proto) {
+            this.proto = proto;
+        }
+
+        @Override
+        protected AbstractProtocol<Nio2Channel> getProtocol() {
+            return proto;
+        }
+
+        @Override
+        protected Log getLog() {
+            return log;
+        }
+
+        @Override
+        public SSLImplementation getSslImplementation() {
+            // AJP does not support SSL
+            return null;
+        }
+
+        /**
+         * Expected to be used by the Poller to release resources on socket
+         * close, errors etc.
+         */
+        @Override
+        public void release(SocketWrapper<Nio2Channel> socket) {
+            Processor<Nio2Channel> processor =
+                    connections.remove(socket.getSocket());
+            if (processor != null) {
+                processor.recycle(true);
+                recycledProcessors.push(processor);
+            }
+        }
+
+        /**
+         * Expected to be used by the handler once the processor is no longer
+         * required.
+         */
+        @Override
+        public void release(SocketWrapper<Nio2Channel> socket,
+                Processor<Nio2Channel> processor, boolean isSocketClosing,
+                boolean addToPoller) {
+            processor.recycle(isSocketClosing);
+            recycledProcessors.push(processor);
+            if (addToPoller) {
+                ((Nio2Endpoint) proto.endpoint).awaitBytes(socket);
+            }
+        }
+
+        @Override
+        protected AjpNio2Processor createProcessor() {
+            AjpNio2Processor processor = new AjpNio2Processor(proto.packetSize, (Nio2Endpoint) proto.endpoint);
+            processor.setAdapter(proto.getAdapter());
+            processor.setTomcatAuthentication(proto.tomcatAuthentication);
+            processor.setRequiredSecret(proto.requiredSecret);
+            processor.setClientCertProvider(proto.getClientCertProvider());
+            register(processor);
+            return processor;
+        }
+
+        @Override
+        public void onCreateSSLEngine(SSLEngine engine) {
+        }
+    }
+}

Added: tomcat/trunk/java/org/apache/coyote/http11/Http11Nio2Processor.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/Http11Nio2Processor.java?rev=1575905&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/Http11Nio2Processor.java (added)
+++ tomcat/trunk/java/org/apache/coyote/http11/Http11Nio2Processor.java Mon Mar 10 11:27:11 2014
@@ -0,0 +1,578 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.coyote.http11;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+import javax.net.ssl.SSLEngine;
+
+import org.apache.coyote.ActionCode;
+import org.apache.coyote.RequestInfo;
+import org.apache.coyote.http11.filters.BufferedInputFilter;
+import org.apache.juli.logging.Log;
+import org.apache.juli.logging.LogFactory;
+import org.apache.tomcat.util.ExceptionUtils;
+import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
+import org.apache.tomcat.util.net.Nio2Channel;
+import org.apache.tomcat.util.net.Nio2Endpoint;
+import org.apache.tomcat.util.net.SSLSupport;
+import org.apache.tomcat.util.net.SecureNio2Channel;
+import org.apache.tomcat.util.net.SocketStatus;
+import org.apache.tomcat.util.net.SocketWrapper;
+
+
+/**
+ * Processes HTTP requests.
+ */
+public class Http11Nio2Processor extends AbstractHttp11Processor<Nio2Channel> {
+
+    private static final Log log = LogFactory.getLog(Http11Nio2Processor.class);
+    @Override
+    protected Log getLog() {
+        return log;
+    }
+
+
+    /**
+     * SSL information.
+     */
+    protected SSLSupport sslSupport;
+
+    // ----------------------------------------------------------- Constructors
+
+
+    public Http11Nio2Processor(int maxHttpHeaderSize, Nio2Endpoint endpoint,
+            int maxTrailerSize, int maxExtensionSize) {
+
+        super(endpoint);
+
+        inputBuffer = new InternalNio2InputBuffer(request, maxHttpHeaderSize);
+        request.setInputBuffer(inputBuffer);
+
+        outputBuffer = new InternalNio2OutputBuffer(response, maxHttpHeaderSize);
+        response.setOutputBuffer(outputBuffer);
+
+        initializeFilters(maxTrailerSize, maxExtensionSize);
+    }
+
+
+    // ----------------------------------------------------- Instance Variables
+
+    /**
+     * Sendfile data.
+     */
+    protected Nio2Endpoint.SendfileData sendfileData = null;
+
+
+    // --------------------------------------------------------- Public Methods
+
+    @Override
+    public SocketState event(SocketStatus status)
+        throws IOException {
+
+        long soTimeout = endpoint.getSoTimeout();
+
+        RequestInfo rp = request.getRequestProcessor();
+        final SocketWrapper<Nio2Channel> attach = socketWrapper;
+        try {
+            rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
+            error = !getAdapter().event(request, response, status);
+            if ( !error ) {
+                if (attach != null) {
+                    attach.setComet(comet);
+                    if (comet) {
+                        Integer comettimeout = (Integer) request.getAttribute(
+                                org.apache.coyote.Constants.COMET_TIMEOUT_ATTR);
+                        if (comettimeout != null) {
+                            attach.setTimeout(comettimeout.longValue());
+                        }
+                    } else {
+                        //reset the timeout
+                        if (keepAlive) {
+                            attach.setTimeout(keepAliveTimeout);
+                        } else {
+                            attach.setTimeout(soTimeout);
+                        }
+                    }
+
+                }
+            }
+        } catch (InterruptedIOException e) {
+            error = true;
+        } catch (Throwable t) {
+            ExceptionUtils.handleThrowable(t);
+            log.error(sm.getString("http11processor.request.process"), t);
+            // 500 - Internal Server Error
+            response.setStatus(500);
+            getAdapter().log(request, response, 0);
+            error = true;
+        }
+
+        rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
+
+        if (error || status==SocketStatus.STOP) {
+            return SocketState.CLOSED;
+        } else if (!comet) {
+            if (keepAlive) {
+                inputBuffer.nextRequest();
+                outputBuffer.nextRequest();
+                return SocketState.OPEN;
+            } else {
+                return SocketState.CLOSED;
+            }
+        } else {
+            return SocketState.LONG;
+        }
+    }
+
+    @Override
+    protected void registerForEvent(boolean read, boolean write) {
+        final Nio2Endpoint.Nio2SocketWrapper attach =
+                (Nio2Endpoint.Nio2SocketWrapper) socketWrapper;
+        if (attach == null) {
+            return;
+        }
+        if (read) {
+            attach.interestOps(attach.interestOps() | Nio2Endpoint.OP_READ);
+        }
+        if (write) {
+            attach.interestOps(attach.interestOps() | Nio2Endpoint.OP_WRITE);
+        }
+    }
+
+
+    @Override
+    protected void resetTimeouts() {
+        if (!error && socketWrapper != null &&
+                asyncStateMachine.isAsyncDispatching()) {
+            long soTimeout = endpoint.getSoTimeout();
+
+            //reset the timeout
+            if (keepAlive) {
+                socketWrapper.setTimeout(keepAliveTimeout);
+            } else {
+                socketWrapper.setTimeout(soTimeout);
+            }
+        }
+    }
+
+
+    @Override
+    protected boolean disableKeepAlive() {
+        return false;
+    }
+
+
+    @Override
+    protected void setRequestLineReadTimeout() throws IOException {
+        // socket.setTimeout()
+        //     - timeout used by poller
+        // socket.getSocket().getIOChannel().socket().setSoTimeout()
+        //     - timeout used for blocking reads
+
+        // When entering the processing loop there will always be data to read
+        // so no point changing timeouts at this point
+
+        // For the second and subsequent executions of the processing loop, a
+        // non-blocking read is used so again no need to set the timeouts
+
+        // Because NIO supports non-blocking reading of the request line and
+        // headers the timeouts need to be set when returning the socket to
+        // the poller rather than here.
+
+        // NO-OP
+    }
+
+
+    @Override
+    protected boolean handleIncompleteRequestLineRead() {
+        // Haven't finished reading the request so keep the socket
+        // open
+        openSocket = true;
+        // Check to see if we have read any of the request line yet
+        if (((InternalNio2InputBuffer)
+                inputBuffer).getParsingRequestLinePhase() < 1) {
+            if (socketWrapper.getLastAccess() > -1 || keptAlive) {
+                // Haven't read the request line and have previously processed a
+                // request. Must be keep-alive. Make sure poller uses keepAlive.
+                socketWrapper.setTimeout(endpoint.getKeepAliveTimeout());
+            }
+        } else {
+            // Started to read request line. Need to keep processor
+            // associated with socket
+            readComplete = false;
+            // Make sure poller uses soTimeout from here onwards
+            socketWrapper.setTimeout(endpoint.getSoTimeout());
+        }
+        if (endpoint.isPaused()) {
+            // 503 - Service unavailable
+            response.setStatus(503);
+            getAdapter().log(request, response, 0);
+            error = true;
+        } else {
+            return true;
+        }
+        return false;
+    }
+
+
+    @Override
+    protected void setSocketTimeout(int timeout) throws IOException {
+        // Not relevant for NIO2
+    }
+
+
+    @Override
+    protected void setCometTimeouts(SocketWrapper<Nio2Channel> socketWrapper) {
+        final Nio2Endpoint.Nio2SocketWrapper attach =
+                (Nio2Endpoint.Nio2SocketWrapper)socketWrapper;
+        if (attach != null)  {
+            attach.setComet(comet);
+            if (comet) {
+                Integer comettimeout = (Integer) request.getAttribute(
+                        org.apache.coyote.Constants.COMET_TIMEOUT_ATTR);
+                if (comettimeout != null) {
+                    attach.setTimeout(comettimeout.longValue());
+                }
+            }
+        }
+    }
+
+
+    @Override
+    protected boolean breakKeepAliveLoop(
+            SocketWrapper<Nio2Channel> socketWrapper) {
+        openSocket = keepAlive;
+        // Do sendfile as needed: add socket to sendfile and end
+        if (sendfileData != null && !error) {
+            ((Nio2Endpoint.Nio2SocketWrapper) socketWrapper).setSendfileData(sendfileData);
+            sendfileData.keepAlive = keepAlive;
+            if (((Nio2Endpoint) endpoint).processSendfile(
+                    (Nio2Endpoint.Nio2SocketWrapper) socketWrapper)) {
+                sendfileInProgress = true;
+            } else {
+                // Write failed
+                if (log.isDebugEnabled()) {
+                    log.debug(sm.getString("http11processor.sendfile.error"));
+                }
+                error = true;
+            }
+            return true;
+        }
+        return false;
+    }
+
+
+    @Override
+    public void recycleInternal() {
+        socketWrapper = null;
+        sendfileData = null;
+    }
+
+
+    // ----------------------------------------------------- ActionHook Methods
+
+
+    /**
+     * Send an action to the connector.
+     *
+     * @param actionCode Type of the action
+     * @param param Action parameter
+     */
+    @Override
+    public void actionInternal(ActionCode actionCode, Object param) {
+
+        if (actionCode == ActionCode.REQ_HOST_ADDR_ATTRIBUTE) {
+
+            if (socketWrapper == null || socketWrapper.getSocket() == null) {
+                request.remoteAddr().recycle();
+            } else {
+                if (socketWrapper.getRemoteAddr() == null) {
+                    InetAddress inetAddr = null;
+                    try {
+                        inetAddr = ((InetSocketAddress) socketWrapper.getSocket().getIOChannel().getRemoteAddress()).getAddress();
+                    } catch (IOException e) {
+                        // Ignore
+                    }
+                    if (inetAddr != null) {
+                        socketWrapper.setRemoteAddr(inetAddr.getHostAddress());
+                    }
+                }
+                request.remoteAddr().setString(socketWrapper.getRemoteAddr());
+            }
+
+        } else if (actionCode == ActionCode.REQ_LOCAL_NAME_ATTRIBUTE) {
+
+            if (socketWrapper == null || socketWrapper.getSocket() == null) {
+                request.localName().recycle();
+            } else {
+                if (socketWrapper.getLocalName() == null) {
+                    InetAddress inetAddr = null;
+                    try {
+                        inetAddr = ((InetSocketAddress) socketWrapper.getSocket().getIOChannel().getLocalAddress()).getAddress();
+                    } catch (IOException e) {
+                        // Ignore
+                    }
+                    if (inetAddr != null) {
+                        socketWrapper.setLocalName(inetAddr.getHostName());
+                    }
+                }
+                request.localName().setString(socketWrapper.getLocalName());
+            }
+
+        } else if (actionCode == ActionCode.REQ_HOST_ATTRIBUTE) {
+
+            if (socketWrapper == null || socketWrapper.getSocket() == null) {
+                request.remoteHost().recycle();
+            } else {
+                if (socketWrapper.getRemoteHost() == null) {
+                    InetAddress inetAddr = null;
+                    try {
+                        inetAddr = ((InetSocketAddress) socketWrapper.getSocket().getIOChannel().getRemoteAddress()).getAddress();
+                    } catch (IOException e) {
+                        // Ignore
+                    }
+                    if (inetAddr != null) {
+                        socketWrapper.setRemoteHost(inetAddr.getHostName());
+                    }
+                    if (socketWrapper.getRemoteHost() == null) {
+                        if (socketWrapper.getRemoteAddr() == null &&
+                                inetAddr != null) {
+                            socketWrapper.setRemoteAddr(inetAddr.getHostAddress());
+                        }
+                        if (socketWrapper.getRemoteAddr() != null) {
+                            socketWrapper.setRemoteHost(socketWrapper.getRemoteAddr());
+                        }
+                    }
+                }
+                request.remoteHost().setString(socketWrapper.getRemoteHost());
+            }
+
+        } else if (actionCode == ActionCode.REQ_LOCAL_ADDR_ATTRIBUTE) {
+
+            if (socketWrapper == null || socketWrapper.getSocket() == null) {
+                request.localAddr().recycle();
+            } else {
+                if (socketWrapper.getLocalAddr() == null) {
+                    try {
+                        socketWrapper.setLocalAddr(
+                                ((InetSocketAddress) socketWrapper.getSocket().getIOChannel().getLocalAddress()).getAddress().getHostAddress());
+                    } catch (IOException e) {
+                        // Ignore
+                    }
+                }
+                request.localAddr().setString(socketWrapper.getLocalAddr());
+            }
+
+        } else if (actionCode == ActionCode.REQ_REMOTEPORT_ATTRIBUTE) {
+
+            if (socketWrapper == null || socketWrapper.getSocket() == null) {
+                request.setRemotePort(0);
+            } else {
+                if (socketWrapper.getRemotePort() == -1) {
+                    try {
+                        socketWrapper.setRemotePort(((InetSocketAddress) socketWrapper.getSocket().getIOChannel().getRemoteAddress()).getPort());
+                    } catch (IOException e) {
+                        // Ignore
+                    }
+                }
+                request.setRemotePort(socketWrapper.getRemotePort());
+            }
+
+        } else if (actionCode == ActionCode.REQ_LOCALPORT_ATTRIBUTE) {
+
+            if (socketWrapper == null || socketWrapper.getSocket() == null) {
+                request.setLocalPort(0);
+            } else {
+                if (socketWrapper.getLocalPort() == -1) {
+                    try {
+                        socketWrapper.setLocalPort(((InetSocketAddress) socketWrapper.getSocket().getIOChannel().getLocalAddress()).getPort());
+                    } catch (IOException e) {
+                        // Ignore
+                    }
+                }
+                request.setLocalPort(socketWrapper.getLocalPort());
+            }
+
+        } else if (actionCode == ActionCode.REQ_SSL_ATTRIBUTE ) {
+
+            try {
+                if (sslSupport != null) {
+                    Object sslO = sslSupport.getCipherSuite();
+                    if (sslO != null) {
+                        request.setAttribute
+                            (SSLSupport.CIPHER_SUITE_KEY, sslO);
+                    }
+                    sslO = sslSupport.getPeerCertificateChain(false);
+                    if (sslO != null) {
+                        request.setAttribute
+                            (SSLSupport.CERTIFICATE_KEY, sslO);
+                    }
+                    sslO = sslSupport.getKeySize();
+                    if (sslO != null) {
+                        request.setAttribute
+                            (SSLSupport.KEY_SIZE_KEY, sslO);
+                    }
+                    sslO = sslSupport.getSessionId();
+                    if (sslO != null) {
+                        request.setAttribute
+                            (SSLSupport.SESSION_ID_KEY, sslO);
+                    }
+                    request.setAttribute(SSLSupport.SESSION_MGR, sslSupport);
+                }
+            } catch (Exception e) {
+                log.warn(sm.getString("http11processor.socket.ssl"), e);
+            }
+
+        } else if (actionCode == ActionCode.REQ_SSL_CERTIFICATE) {
+
+            if (sslSupport != null && socketWrapper.getSocket() != null) {
+                /*
+                 * Consume and buffer the request body, so that it does not
+                 * interfere with the client's handshake messages
+                 */
+                InputFilter[] inputFilters = inputBuffer.getFilters();
+                ((BufferedInputFilter) inputFilters[Constants.BUFFERED_FILTER])
+                    .setLimit(maxSavePostSize);
+                inputBuffer.addActiveFilter
+                    (inputFilters[Constants.BUFFERED_FILTER]);
+                SecureNio2Channel sslChannel = (SecureNio2Channel) socketWrapper.getSocket();
+                SSLEngine engine = sslChannel.getSslEngine();
+                if (!engine.getNeedClientAuth()) {
+                    // Need to re-negotiate SSL connection
+                    engine.setNeedClientAuth(true);
+                    try {
+                        sslChannel.rehandshake();
+                        sslSupport = ((Nio2Endpoint)endpoint).getHandler()
+                                .getSslImplementation().getSSLSupport(
+                                        engine.getSession());
+                    } catch (IOException ioe) {
+                        log.warn(sm.getString("http11processor.socket.sslreneg",ioe));
+                    }
+                }
+
+                try {
+                    // use force=false since re-negotiation is handled above
+                    // (and it is a NO-OP for NIO anyway)
+                    Object sslO = sslSupport.getPeerCertificateChain(false);
+                    if( sslO != null) {
+                        request.setAttribute
+                            (SSLSupport.CERTIFICATE_KEY, sslO);
+                    }
+                } catch (Exception e) {
+                    log.warn(sm.getString("http11processor.socket.ssl"), e);
+                }
+            }
+        } else if (actionCode == ActionCode.COMET_BEGIN) {
+            comet = true;
+        } else if (actionCode == ActionCode.COMET_END) {
+            comet = false;
+        } else if (actionCode == ActionCode.COMET_CLOSE) {
+            if (socketWrapper == null || socketWrapper.getSocket() == null) {
+                return;
+            }
+            RequestInfo rp = request.getRequestProcessor();
+            if (rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE) {
+                // Close event for this processor triggered by request
+                // processing in another processor, a non-Tomcat thread (i.e.
+                // an application controlled thread) or similar.
+                endpoint.processSocket(this.socketWrapper, SocketStatus.OPEN_READ, true);
+            }
+        } else if (actionCode == ActionCode.COMET_SETTIMEOUT) {
+            if (param == null) {
+                return;
+            }
+            if (socketWrapper == null) {
+                return;
+            }
+            long timeout = ((Long)param).longValue();
+            //if we are not piggy backing on a worker thread, set the timeout
+            RequestInfo rp = request.getRequestProcessor();
+            if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) {
+                socketWrapper.setTimeout(timeout);
+            }
+        } else if (actionCode == ActionCode.ASYNC_COMPLETE) {
+            socketWrapper.clearDispatches();
+            if (asyncStateMachine.asyncComplete()) {
+                endpoint.processSocket(this.socketWrapper, SocketStatus.OPEN_READ, true);
+            }
+        } else if (actionCode == ActionCode.ASYNC_SETTIMEOUT) {
+            if (param == null) {
+                return;
+            }
+            if (socketWrapper == null) {
+                return;
+            }
+            long timeout = ((Long)param).longValue();
+            //if we are not piggy backing on a worker thread, set the timeout
+            socketWrapper.setTimeout(timeout);
+        } else if (actionCode == ActionCode.ASYNC_DISPATCH) {
+            if (asyncStateMachine.asyncDispatch()) {
+                endpoint.processSocket(this.socketWrapper, SocketStatus.OPEN_READ, true);
+            }
+        }
+    }
+
+
+    // ------------------------------------------------------ Protected Methods
+
+
+    @Override
+    protected void prepareRequestInternal() {
+        sendfileData = null;
+    }
+
+    @Override
+    protected boolean prepareSendfile(OutputFilter[] outputFilters) {
+        String fileName = (String) request.getAttribute(
+                org.apache.coyote.Constants.SENDFILE_FILENAME_ATTR);
+        if (fileName != null) {
+            // No entity body sent here
+            outputBuffer.addActiveFilter(outputFilters[Constants.VOID_FILTER]);
+            contentDelimitation = true;
+            sendfileData = new Nio2Endpoint.SendfileData();
+            sendfileData.fileName = fileName;
+            sendfileData.pos = ((Long) request.getAttribute(
+                    org.apache.coyote.Constants.SENDFILE_FILE_START_ATTR)).longValue();
+            sendfileData.length = ((Long) request.getAttribute(
+                    org.apache.coyote.Constants.SENDFILE_FILE_END_ATTR)).longValue() - sendfileData.pos;
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    protected AbstractInputBuffer<Nio2Channel> getInputBuffer() {
+        return inputBuffer;
+    }
+
+    @Override
+    protected AbstractOutputBuffer<Nio2Channel> getOutputBuffer() {
+        return outputBuffer;
+    }
+
+    /**
+     * Set the SSL information for this HTTP connection.
+     */
+    @Override
+    public void setSslSupport(SSLSupport sslSupport) {
+        this.sslSupport = sslSupport;
+    }
+}

Added: tomcat/trunk/java/org/apache/coyote/http11/Http11Nio2Protocol.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/Http11Nio2Protocol.java?rev=1575905&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/Http11Nio2Protocol.java (added)
+++ tomcat/trunk/java/org/apache/coyote/http11/Http11Nio2Protocol.java Mon Mar 10 11:27:11 2014
@@ -0,0 +1,278 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.coyote.http11;
+
+import java.io.IOException;
+
+import javax.net.ssl.SSLEngine;
+import javax.servlet.http.HttpUpgradeHandler;
+
+import org.apache.coyote.AbstractProtocol;
+import org.apache.coyote.Processor;
+import org.apache.coyote.http11.upgrade.Nio2Processor;
+import org.apache.juli.logging.Log;
+import org.apache.juli.logging.LogFactory;
+import org.apache.tomcat.util.net.AbstractEndpoint;
+import org.apache.tomcat.util.net.Nio2Channel;
+import org.apache.tomcat.util.net.Nio2Endpoint;
+import org.apache.tomcat.util.net.Nio2Endpoint.Handler;
+import org.apache.tomcat.util.net.Nio2Endpoint.Nio2SocketWrapper;
+import org.apache.tomcat.util.net.SSLImplementation;
+import org.apache.tomcat.util.net.SecureNio2Channel;
+import org.apache.tomcat.util.net.SocketStatus;
+import org.apache.tomcat.util.net.SocketWrapper;
+
+
+/**
+ * HTTP/1.1 protocol implementation using NIO2.
+ */
+public class Http11Nio2Protocol extends AbstractHttp11JsseProtocol<Nio2Channel> {
+
+    private static final Log log = LogFactory.getLog(Http11Nio2Protocol.class);
+
+
+    @Override
+    protected Log getLog() { return log; }
+
+
+    @Override
+    protected AbstractEndpoint.Handler getHandler() {
+        return cHandler;
+    }
+
+
+    public Http11Nio2Protocol() {
+        endpoint=new Nio2Endpoint();
+        cHandler = new Http11ConnectionHandler(this);
+        ((Nio2Endpoint) endpoint).setHandler(cHandler);
+        setSoLinger(Constants.DEFAULT_CONNECTION_LINGER);
+        setSoTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT);
+        setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY);
+    }
+
+
+    public Nio2Endpoint getEndpoint() {
+        return ((Nio2Endpoint)endpoint);
+    }
+
+    @Override
+    public void start() throws Exception {
+        super.start();
+        if (npnHandler != null) {
+            npnHandler.init(getEndpoint(), 0, getAdapter());
+        }
+    }
+
+    // -------------------- Properties--------------------
+
+    private final Http11ConnectionHandler cHandler;
+
+    // -------------------- Pool setup --------------------
+
+    public void setAcceptorThreadPriority(int threadPriority) {
+        ((Nio2Endpoint)endpoint).setAcceptorThreadPriority(threadPriority);
+    }
+
+    public void setPollerThreadPriority(int threadPriority) {
+        ((Nio2Endpoint)endpoint).setPollerThreadPriority(threadPriority);
+    }
+
+    public int getAcceptorThreadPriority() {
+      return ((Nio2Endpoint)endpoint).getAcceptorThreadPriority();
+    }
+
+    public int getPollerThreadPriority() {
+      return ((Nio2Endpoint)endpoint).getThreadPriority();
+    }
+
+    public boolean getUseSendfile() {
+        return endpoint.getUseSendfile();
+    }
+
+    public void setUseSendfile(boolean useSendfile) {
+        ((Nio2Endpoint)endpoint).setUseSendfile(useSendfile);
+    }
+
+    // -------------------- Tcp setup --------------------
+
+    public void setOomParachute(int oomParachute) {
+        ((Nio2Endpoint)endpoint).setOomParachute(oomParachute);
+    }
+
+    // ----------------------------------------------------- JMX related methods
+
+    @Override
+    protected String getNamePrefix() {
+        return ("http-nio2");
+    }
+
+
+    // --------------------  Connection handler --------------------
+
+    protected static class Http11ConnectionHandler
+            extends AbstractConnectionHandler<Nio2Channel,Http11Nio2Processor>
+            implements Handler {
+
+        protected Http11Nio2Protocol proto;
+
+        Http11ConnectionHandler(Http11Nio2Protocol proto) {
+            this.proto = proto;
+        }
+
+        @Override
+        protected AbstractProtocol<Nio2Channel> getProtocol() {
+            return proto;
+        }
+
+        @Override
+        protected Log getLog() {
+            return log;
+        }
+
+
+        @Override
+        public SSLImplementation getSslImplementation() {
+            return proto.sslImplementation;
+        }
+
+        /**
+         * Expected to be used by the Poller to release resources on socket
+         * close, errors etc.
+         */
+        @Override
+        public void release(SocketWrapper<Nio2Channel> socket) {
+            Processor<Nio2Channel> processor =
+                connections.remove(socket.getSocket());
+            if (processor != null) {
+                processor.recycle(true);
+                recycledProcessors.push(processor);
+            }
+        }
+
+        @Override
+        public SocketState process(SocketWrapper<Nio2Channel> socket,
+                SocketStatus status) {
+            if (proto.npnHandler != null) {
+                SocketState ss = proto.npnHandler.process(socket, status);
+                if (ss != SocketState.OPEN) {
+                    return ss;
+                }
+            }
+            return super.process(socket, status);
+        }
+
+
+        /**
+         * Expected to be used by the handler once the processor is no longer
+         * required.
+         *
+         * @param socket
+         * @param processor
+         * @param isSocketClosing   Not used in HTTP
+         * @param addToPoller
+         */
+        @Override
+        public void release(SocketWrapper<Nio2Channel> socket,
+                Processor<Nio2Channel> processor, boolean isSocketClosing,
+                boolean addToPoller) {
+            processor.recycle(isSocketClosing);
+            recycledProcessors.push(processor);
+            if (socket.isAsync()) {
+                ((Nio2Endpoint) proto.endpoint).removeTimeout(socket);
+            }
+            if (addToPoller) {
+                ((Nio2Endpoint) proto.endpoint).awaitBytes(socket);
+            }
+        }
+
+
+        @Override
+        protected void initSsl(SocketWrapper<Nio2Channel> socket,
+                Processor<Nio2Channel> processor) {
+            if (proto.isSSLEnabled() &&
+                    (proto.sslImplementation != null)
+                    && (socket.getSocket() instanceof SecureNio2Channel)) {
+                SecureNio2Channel ch = (SecureNio2Channel)socket.getSocket();
+                processor.setSslSupport(
+                        proto.sslImplementation.getSSLSupport(
+                                ch.getSslEngine().getSession()));
+            } else {
+                processor.setSslSupport(null);
+            }
+
+        }
+
+        @Override
+        protected void longPoll(SocketWrapper<Nio2Channel> socket,
+                Processor<Nio2Channel> processor) {
+            if (processor.isAsync()) {
+                socket.setAsync(true);
+                ((Nio2Endpoint) proto.endpoint).addTimeout(socket);
+            } else if (processor.isUpgrade()) {
+                if (((Nio2SocketWrapper) socket).isUpgradeInit()) {
+                    ((Nio2Endpoint) proto.endpoint).awaitBytes(socket);
+                }
+            } else {
+                // Either:
+                //  - this is comet request
+                //  - this is an upgraded connection
+                //  - the request line/headers have not been completely
+                //    read
+                // The completion handlers should be in place,
+                // so nothing to do here
+            }
+        }
+
+        @Override
+        public Http11Nio2Processor createProcessor() {
+            Http11Nio2Processor processor = new Http11Nio2Processor(
+                    proto.getMaxHttpHeaderSize(), (Nio2Endpoint)proto.endpoint,
+                    proto.getMaxTrailerSize(), proto.getMaxExtensionSize());
+            processor.setAdapter(proto.getAdapter());
+            processor.setMaxKeepAliveRequests(proto.getMaxKeepAliveRequests());
+            processor.setKeepAliveTimeout(proto.getKeepAliveTimeout());
+            processor.setConnectionUploadTimeout(
+                    proto.getConnectionUploadTimeout());
+            processor.setDisableUploadTimeout(proto.getDisableUploadTimeout());
+            processor.setCompressionMinSize(proto.getCompressionMinSize());
+            processor.setCompression(proto.getCompression());
+            processor.setNoCompressionUserAgents(proto.getNoCompressionUserAgents());
+            processor.setCompressableMimeTypes(proto.getCompressableMimeTypes());
+            processor.setRestrictedUserAgents(proto.getRestrictedUserAgents());
+            processor.setSocketBuffer(proto.getSocketBuffer());
+            processor.setMaxSavePostSize(proto.getMaxSavePostSize());
+            processor.setServer(proto.getServer());
+            register(processor);
+            return processor;
+        }
+
+        @Override
+        protected Processor<Nio2Channel> createUpgradeProcessor(
+                SocketWrapper<Nio2Channel> socket,
+                HttpUpgradeHandler httpUpgradeProcessor)
+                throws IOException {
+            return new Nio2Processor(socket, httpUpgradeProcessor);
+        }
+
+        @Override
+        public void onCreateSSLEngine(SSLEngine engine) {
+            if (proto.npnHandler != null) {
+                proto.npnHandler.onCreateEngine(engine);
+            }
+        }
+    }
+}

Added: tomcat/trunk/java/org/apache/coyote/http11/InternalNio2InputBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalNio2InputBuffer.java?rev=1575905&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/InternalNio2InputBuffer.java (added)
+++ tomcat/trunk/java/org/apache/coyote/http11/InternalNio2InputBuffer.java Mon Mar 10 11:27:11 2014
@@ -0,0 +1,930 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.coyote.http11;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.CompletionHandler;
+import java.nio.channels.ReadPendingException;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.servlet.RequestDispatcher;
+
+import org.apache.coyote.InputBuffer;
+import org.apache.coyote.Request;
+import org.apache.juli.logging.Log;
+import org.apache.juli.logging.LogFactory;
+import org.apache.tomcat.util.buf.ByteChunk;
+import org.apache.tomcat.util.buf.MessageBytes;
+import org.apache.tomcat.util.net.AbstractEndpoint;
+import org.apache.tomcat.util.net.Nio2Channel;
+import org.apache.tomcat.util.net.Nio2Endpoint;
+import org.apache.tomcat.util.net.SocketStatus;
+import org.apache.tomcat.util.net.SocketWrapper;
+
+/**
+ * Output buffer implementation for NIO2.
+ */
+public class InternalNio2InputBuffer extends AbstractInputBuffer<Nio2Channel> {
+
+    private static final Log log =
+            LogFactory.getLog(InternalNio2InputBuffer.class);
+
+    // -------------------------------------------------------------- Constants
+
+    enum HeaderParseStatus {
+        DONE, HAVE_MORE_HEADERS, NEED_MORE_DATA
+    }
+
+    enum HeaderParsePosition {
+        /**
+         * Start of a new header. A CRLF here means that there are no more
+         * headers. Any other character starts a header name.
+         */
+        HEADER_START,
+        /**
+         * Reading a header name. All characters of header are HTTP_TOKEN_CHAR.
+         * Header name is followed by ':'. No whitespace is allowed.<br />
+         * Any non-HTTP_TOKEN_CHAR (this includes any whitespace) encountered
+         * before ':' will result in the whole line being ignored.
+         */
+        HEADER_NAME,
+        /**
+         * Skipping whitespace before text of header value starts, either on the
+         * first line of header value (just after ':') or on subsequent lines
+         * when it is known that subsequent line starts with SP or HT.
+         */
+        HEADER_VALUE_START,
+        /**
+         * Reading the header value. We are inside the value. Either on the
+         * first line or on any subsequent line. We come into this state from
+         * HEADER_VALUE_START after the first non-SP/non-HT byte is encountered
+         * on the line.
+         */
+        HEADER_VALUE,
+        /**
+         * Before reading a new line of a header. Once the next byte is peeked,
+         * the state changes without advancing our position. The state becomes
+         * either HEADER_VALUE_START (if that first byte is SP or HT), or
+         * HEADER_START (otherwise).
+         */
+        HEADER_MULTI_LINE,
+        /**
+         * Reading all bytes until the next CRLF. The line is being ignored.
+         */
+        HEADER_SKIPLINE
+    }
+
+    // ----------------------------------------------------------- Constructors
+
+
+    /**
+     * Alternate constructor.
+     */
+    public InternalNio2InputBuffer(Request request, int headerBufferSize) {
+
+        this.request = request;
+        headers = request.getMimeHeaders();
+
+        this.headerBufferSize = headerBufferSize;
+
+        inputStreamInputBuffer = new SocketInputBuffer();
+
+        filterLibrary = new InputFilter[0];
+        activeFilters = new InputFilter[0];
+        lastActiveFilter = -1;
+
+        parsingHeader = true;
+        parsingRequestLine = true;
+        parsingRequestLinePhase = 0;
+        parsingRequestLineEol = false;
+        parsingRequestLineStart = 0;
+        parsingRequestLineQPos = -1;
+        headerParsePos = HeaderParsePosition.HEADER_START;
+        headerData.recycle();
+        swallowInput = true;
+
+    }
+
+    /**
+     * Parsing state - used for non blocking parsing so that
+     * when more data arrives, we can pick up where we left off.
+     */
+    private boolean parsingRequestLine;
+    private int parsingRequestLinePhase = 0;
+    private boolean parsingRequestLineEol = false;
+    private int parsingRequestLineStart = 0;
+    private int parsingRequestLineQPos = -1;
+    private HeaderParsePosition headerParsePos;
+
+    /**
+     * Underlying socket.
+     */
+    private SocketWrapper<Nio2Channel> socket;
+
+    /**
+     * Maximum allowed size of the HTTP request line plus headers plus any
+     * leading blank lines.
+     */
+    private final int headerBufferSize;
+
+    /**
+     * Known size of the NioChannel read buffer.
+     */
+    private int socketReadBufferSize;
+
+    /**
+     * The completion handler used for asynchronous read operations
+     */
+    private CompletionHandler<Integer, SocketWrapper<Nio2Channel>> completionHandler;
+
+    /**
+     * The associated endpoint.
+     */
+    protected AbstractEndpoint<Nio2Channel> endpoint = null;
+
+    /**
+     * Read pending flag.
+     */
+    protected volatile boolean readPending = false;
+
+    /**
+     * Exception that occurred during writing.
+     */
+    protected IOException e = null;
+
+    /**
+     * Track if the byte buffer is flipped
+     */
+    protected volatile boolean flipped = false;
+
+    // --------------------------------------------------------- Public Methods
+
+    @Override
+    protected final Log getLog() {
+        return log;
+    }
+
+
+    /**
+     * Recycle the input buffer. This should be called when closing the
+     * connection.
+     */
+    @Override
+    public void recycle() {
+        super.recycle();
+        socket = null;
+        headerParsePos = HeaderParsePosition.HEADER_START;
+        parsingRequestLine = true;
+        parsingRequestLinePhase = 0;
+        parsingRequestLineEol = false;
+        parsingRequestLineStart = 0;
+        parsingRequestLineQPos = -1;
+        headerData.recycle();
+        readPending = false;
+        flipped = false;
+        e = null;
+    }
+
+
+    /**
+     * End processing of current HTTP request.
+     * Note: All bytes of the current request should have been already
+     * consumed. This method only resets all the pointers so that we are ready
+     * to parse the next HTTP request.
+     */
+    @Override
+    public void nextRequest() {
+        super.nextRequest();
+        headerParsePos = HeaderParsePosition.HEADER_START;
+        parsingRequestLine = true;
+        parsingRequestLinePhase = 0;
+        parsingRequestLineEol = false;
+        parsingRequestLineStart = 0;
+        parsingRequestLineQPos = -1;
+        headerData.recycle();
+    }
+
+    /**
+     * Read the request line. This function is meant to be used during the
+     * HTTP request header parsing. Do NOT attempt to read the request body
+     * using it.
+     *
+     * @throws IOException If an exception occurs during the underlying socket
+     * read operations, or if the given buffer is not big enough to accommodate
+     * the whole line.
+     * @return true if data is properly fed; false if no data is available
+     * immediately and thread should be freed
+     */
+    @Override
+    public boolean parseRequestLine(boolean useAvailableDataOnly)
+        throws IOException {
+
+        //check state
+        if ( !parsingRequestLine ) return true;
+        //
+        // Skipping blank lines
+        //
+        if ( parsingRequestLinePhase < 2 ) {
+            byte chr = 0;
+            do {
+
+                // Read new bytes if needed
+                if (pos >= lastValid) {
+                    if (useAvailableDataOnly) {
+                        return false;
+                    }
+                    // Do a simple read with a short timeout
+                    if (!fill(false)) {
+                        // A read is pending, so no longer in initial state
+                        parsingRequestLinePhase = 1;
+                        return false;
+                    }
+                }
+                chr = buf[pos++];
+            } while ((chr == Constants.CR) || (chr == Constants.LF));
+            pos--;
+
+            parsingRequestLineStart = pos;
+            parsingRequestLinePhase = 2;
+            if (log.isDebugEnabled()) {
+                log.debug("Received ["
+                        + new String(buf, pos, lastValid - pos,
+                                StandardCharsets.ISO_8859_1)
+                        + "]");
+            }
+        }
+        if ( parsingRequestLinePhase == 2 ) {
+            //
+            // Reading the method name
+            // Method name is always US-ASCII
+            //
+            boolean space = false;
+            while (!space) {
+                // Read new bytes if needed
+                if (pos >= lastValid) {
+                    if (!fill(false)) //request line parsing
+                        return false;
+                }
+                // Spec says no CR or LF in method name
+                if (buf[pos] == Constants.CR || buf[pos] == Constants.LF) {
+                    throw new IllegalArgumentException(
+                            sm.getString("iib.invalidmethod"));
+                }
+                if (buf[pos] == Constants.SP || buf[pos] == Constants.HT) {
+                    space = true;
+                    request.method().setBytes(buf, parsingRequestLineStart, pos - parsingRequestLineStart);
+                }
+                pos++;
+            }
+            parsingRequestLinePhase = 3;
+        }
+        if ( parsingRequestLinePhase == 3 ) {
+            // Spec says single SP but also be tolerant of multiple and/or HT
+            boolean space = true;
+            while (space) {
+                // Read new bytes if needed
+                if (pos >= lastValid) {
+                    if (!fill(false)) //request line parsing
+                        return false;
+                }
+                if (buf[pos] == Constants.SP || buf[pos] == Constants.HT) {
+                    pos++;
+                } else {
+                    space = false;
+                }
+            }
+            parsingRequestLineStart = pos;
+            parsingRequestLinePhase = 4;
+        }
+        if (parsingRequestLinePhase == 4) {
+            // Mark the current buffer position
+
+            int end = 0;
+            //
+            // Reading the URI
+            //
+            boolean space = false;
+            while (!space) {
+                // Read new bytes if needed
+                if (pos >= lastValid) {
+                    if (!fill(false)) //request line parsing
+                        return false;
+                }
+                if (buf[pos] == Constants.SP || buf[pos] == Constants.HT) {
+                    space = true;
+                    end = pos;
+                } else if ((buf[pos] == Constants.CR)
+                           || (buf[pos] == Constants.LF)) {
+                    // HTTP/0.9 style request
+                    parsingRequestLineEol = true;
+                    space = true;
+                    end = pos;
+                } else if ((buf[pos] == Constants.QUESTION)
+                           && (parsingRequestLineQPos == -1)) {
+                    parsingRequestLineQPos = pos;
+                }
+                pos++;
+            }
+            request.unparsedURI().setBytes(buf, parsingRequestLineStart, end - parsingRequestLineStart);
+            if (parsingRequestLineQPos >= 0) {
+                request.queryString().setBytes(buf, parsingRequestLineQPos + 1,
+                                               end - parsingRequestLineQPos - 1);
+                request.requestURI().setBytes(buf, parsingRequestLineStart, parsingRequestLineQPos - parsingRequestLineStart);
+            } else {
+                request.requestURI().setBytes(buf, parsingRequestLineStart, end - parsingRequestLineStart);
+            }
+            parsingRequestLinePhase = 5;
+        }
+        if ( parsingRequestLinePhase == 5 ) {
+            // Spec says single SP but also be tolerant of multiple and/or HT
+            boolean space = true;
+            while (space) {
+                // Read new bytes if needed
+                if (pos >= lastValid) {
+                    if (!fill(false)) //request line parsing
+                        return false;
+                }
+                if (buf[pos] == Constants.SP || buf[pos] == Constants.HT) {
+                    pos++;
+                } else {
+                    space = false;
+                }
+            }
+            parsingRequestLineStart = pos;
+            parsingRequestLinePhase = 6;
+
+            // Mark the current buffer position
+            end = 0;
+        }
+        if (parsingRequestLinePhase == 6) {
+            //
+            // Reading the protocol
+            // Protocol is always US-ASCII
+            //
+            while (!parsingRequestLineEol) {
+                // Read new bytes if needed
+                if (pos >= lastValid) {
+                    if (!fill(false)) //request line parsing
+                        return false;
+                }
+
+                if (buf[pos] == Constants.CR) {
+                    end = pos;
+                } else if (buf[pos] == Constants.LF) {
+                    if (end == 0)
+                        end = pos;
+                    parsingRequestLineEol = true;
+                }
+                pos++;
+            }
+
+            if ( (end - parsingRequestLineStart) > 0) {
+                request.protocol().setBytes(buf, parsingRequestLineStart, end - parsingRequestLineStart);
+            } else {
+                request.protocol().setString("");
+            }
+            parsingRequestLine = false;
+            parsingRequestLinePhase = 0;
+            parsingRequestLineEol = false;
+            parsingRequestLineStart = 0;
+            return true;
+        }
+        throw new IllegalStateException("Invalid request line parse phase:"+parsingRequestLinePhase);
+    }
+
+    private void expand(int newsize) {
+        if ( newsize > buf.length ) {
+            if (parsingHeader) {
+                throw new IllegalArgumentException(
+                        sm.getString("iib.requestheadertoolarge.error"));
+            }
+            // Should not happen
+            log.warn("Expanding buffer size. Old size: " + buf.length
+                    + ", new size: " + newsize, new Exception());
+            byte[] tmp = new byte[newsize];
+            System.arraycopy(buf,0,tmp,0,buf.length);
+            buf = tmp;
+        }
+    }
+
+    /**
+     * Parse the HTTP headers.
+     */
+    @Override
+    public boolean parseHeaders()
+        throws IOException {
+        if (!parsingHeader) {
+            throw new IllegalStateException(
+                    sm.getString("iib.parseheaders.ise.error"));
+        }
+
+        HeaderParseStatus status = HeaderParseStatus.HAVE_MORE_HEADERS;
+
+        do {
+            status = parseHeader();
+            // Checking that
+            // (1) Headers plus request line size does not exceed its limit
+            // (2) There are enough bytes to avoid expanding the buffer when
+            // reading body
+            // Technically, (2) is technical limitation, (1) is logical
+            // limitation to enforce the meaning of headerBufferSize
+            // From the way how buf is allocated and how blank lines are being
+            // read, it should be enough to check (1) only.
+            if (pos > headerBufferSize
+                    || buf.length - pos < socketReadBufferSize) {
+                throw new IllegalArgumentException(
+                        sm.getString("iib.requestheadertoolarge.error"));
+            }
+        } while ( status == HeaderParseStatus.HAVE_MORE_HEADERS );
+        if (status == HeaderParseStatus.DONE) {
+            parsingHeader = false;
+            end = pos;
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+
+    /**
+     * Parse an HTTP header.
+     *
+     * @return false after reading a blank line (which indicates that the
+     * HTTP header parsing is done
+     */
+    private HeaderParseStatus parseHeader()
+        throws IOException {
+
+        //
+        // Check for blank line
+        //
+
+        byte chr = 0;
+        while (headerParsePos == HeaderParsePosition.HEADER_START) {
+
+            // Read new bytes if needed
+            if (pos >= lastValid) {
+                if (!fill(false)) {//parse header
+                    headerParsePos = HeaderParsePosition.HEADER_START;
+                    return HeaderParseStatus.NEED_MORE_DATA;
+                }
+            }
+
+            chr = buf[pos];
+
+            if (chr == Constants.CR) {
+                // Skip
+            } else if (chr == Constants.LF) {
+                pos++;
+                return HeaderParseStatus.DONE;
+            } else {
+                break;
+            }
+
+            pos++;
+
+        }
+
+        if ( headerParsePos == HeaderParsePosition.HEADER_START ) {
+            // Mark the current buffer position
+            headerData.start = pos;
+            headerParsePos = HeaderParsePosition.HEADER_NAME;
+        }
+
+        //
+        // Reading the header name
+        // Header name is always US-ASCII
+        //
+
+        while (headerParsePos == HeaderParsePosition.HEADER_NAME) {
+
+            // Read new bytes if needed
+            if (pos >= lastValid) {
+                if (!fill(false)) { //parse header
+                    return HeaderParseStatus.NEED_MORE_DATA;
+                }
+            }
+
+            chr = buf[pos];
+            if (chr == Constants.COLON) {
+                headerParsePos = HeaderParsePosition.HEADER_VALUE_START;
+                headerData.headerValue = headers.addValue(buf, headerData.start, pos - headerData.start);
+                pos++;
+                // Mark the current buffer position
+                headerData.start = pos;
+                headerData.realPos = pos;
+                headerData.lastSignificantChar = pos;
+                break;
+            } else if (!HTTP_TOKEN_CHAR[chr]) {
+                // If a non-token header is detected, skip the line and
+                // ignore the header
+                headerData.lastSignificantChar = pos;
+                return skipLine();
+            }
+
+            // chr is next byte of header name. Convert to lowercase.
+            if ((chr >= Constants.A) && (chr <= Constants.Z)) {
+                buf[pos] = (byte) (chr - Constants.LC_OFFSET);
+            }
+            pos++;
+        }
+
+        // Skip the line and ignore the header
+        if (headerParsePos == HeaderParsePosition.HEADER_SKIPLINE) {
+            return skipLine();
+        }
+
+        //
+        // Reading the header value (which can be spanned over multiple lines)
+        //
+
+        while (headerParsePos == HeaderParsePosition.HEADER_VALUE_START ||
+               headerParsePos == HeaderParsePosition.HEADER_VALUE ||
+               headerParsePos == HeaderParsePosition.HEADER_MULTI_LINE) {
+
+            if ( headerParsePos == HeaderParsePosition.HEADER_VALUE_START ) {
+                // Skipping spaces
+                while (true) {
+                    // Read new bytes if needed
+                    if (pos >= lastValid) {
+                        if (!fill(false)) {//parse header
+                            //HEADER_VALUE_START
+                            return HeaderParseStatus.NEED_MORE_DATA;
+                        }
+                    }
+
+                    chr = buf[pos];
+                    if (chr == Constants.SP || chr == Constants.HT) {
+                        pos++;
+                    } else {
+                        headerParsePos = HeaderParsePosition.HEADER_VALUE;
+                        break;
+                    }
+                }
+            }
+            if ( headerParsePos == HeaderParsePosition.HEADER_VALUE ) {
+
+                // Reading bytes until the end of the line
+                boolean eol = false;
+                while (!eol) {
+
+                    // Read new bytes if needed
+                    if (pos >= lastValid) {
+                        if (!fill(false)) {//parse header
+                            //HEADER_VALUE
+                            return HeaderParseStatus.NEED_MORE_DATA;
+                        }
+                    }
+
+                    chr = buf[pos];
+                    if (chr == Constants.CR) {
+                        // Skip
+                    } else if (chr == Constants.LF) {
+                        eol = true;
+                    } else if (chr == Constants.SP || chr == Constants.HT) {
+                        buf[headerData.realPos] = chr;
+                        headerData.realPos++;
+                    } else {
+                        buf[headerData.realPos] = chr;
+                        headerData.realPos++;
+                        headerData.lastSignificantChar = headerData.realPos;
+                    }
+
+                    pos++;
+                }
+
+                // Ignore whitespaces at the end of the line
+                headerData.realPos = headerData.lastSignificantChar;
+
+                // Checking the first character of the new line. If the character
+                // is a LWS, then it's a multiline header
+                headerParsePos = HeaderParsePosition.HEADER_MULTI_LINE;
+            }
+            // Read new bytes if needed
+            if (pos >= lastValid) {
+                if (!fill(false)) {//parse header
+                    //HEADER_MULTI_LINE
+                    return HeaderParseStatus.NEED_MORE_DATA;
+                }
+            }
+
+            chr = buf[pos];
+            if ( headerParsePos == HeaderParsePosition.HEADER_MULTI_LINE ) {
+                if ( (chr != Constants.SP) && (chr != Constants.HT)) {
+                    headerParsePos = HeaderParsePosition.HEADER_START;
+                    break;
+                } else {
+                    // Copying one extra space in the buffer (since there must
+                    // be at least one space inserted between the lines)
+                    buf[headerData.realPos] = chr;
+                    headerData.realPos++;
+                    headerParsePos = HeaderParsePosition.HEADER_VALUE_START;
+                }
+            }
+        }
+        // Set the header value
+        headerData.headerValue.setBytes(buf, headerData.start,
+                headerData.lastSignificantChar - headerData.start);
+        headerData.recycle();
+        return HeaderParseStatus.HAVE_MORE_HEADERS;
+    }
+
+    public int getParsingRequestLinePhase() {
+        return parsingRequestLinePhase;
+    }
+
+    private HeaderParseStatus skipLine() throws IOException {
+        headerParsePos = HeaderParsePosition.HEADER_SKIPLINE;
+        boolean eol = false;
+
+        // Reading bytes until the end of the line
+        while (!eol) {
+
+            // Read new bytes if needed
+            if (pos >= lastValid) {
+                if (!fill(false)) {
+                    return HeaderParseStatus.NEED_MORE_DATA;
+                }
+            }
+
+            if (buf[pos] == Constants.CR) {
+                // Skip
+            } else if (buf[pos] == Constants.LF) {
+                eol = true;
+            } else {
+                headerData.lastSignificantChar = pos;
+            }
+
+            pos++;
+        }
+        if (log.isDebugEnabled()) {
+            log.debug(sm.getString("iib.invalidheader", new String(buf,
+                    headerData.start,
+                    headerData.lastSignificantChar - headerData.start + 1,
+                    StandardCharsets.ISO_8859_1)));
+        }
+
+        headerParsePos = HeaderParsePosition.HEADER_START;
+        return HeaderParseStatus.HAVE_MORE_HEADERS;
+    }
+
+    private final HeaderParseData headerData = new HeaderParseData();
+    public static class HeaderParseData {
+        /**
+         * When parsing header name: first character of the header.<br />
+         * When skipping broken header line: first character of the header.<br />
+         * When parsing header value: first character after ':'.
+         */
+        int start = 0;
+        /**
+         * When parsing header name: not used (stays as 0).<br />
+         * When skipping broken header line: not used (stays as 0).<br />
+         * When parsing header value: starts as the first character after ':'.
+         * Then is increased as far as more bytes of the header are harvested.
+         * Bytes from buf[pos] are copied to buf[realPos]. Thus the string from
+         * [start] to [realPos-1] is the prepared value of the header, with
+         * whitespaces removed as needed.<br />
+         */
+        int realPos = 0;
+        /**
+         * When parsing header name: not used (stays as 0).<br />
+         * When skipping broken header line: last non-CR/non-LF character.<br />
+         * When parsing header value: position after the last not-LWS character.<br />
+         */
+        int lastSignificantChar = 0;
+        /**
+         * MB that will store the value of the header. It is null while parsing
+         * header name and is created after the name has been parsed.
+         */
+        MessageBytes headerValue = null;
+        public void recycle() {
+            start = 0;
+            realPos = 0;
+            lastSignificantChar = 0;
+            headerValue = null;
+        }
+    }
+
+
+    // ------------------------------------------------------ Protected Methods
+
+    @Override
+    protected void init(SocketWrapper<Nio2Channel> socketWrapper,
+            AbstractEndpoint<Nio2Channel> associatedEndpoint) throws IOException {
+
+        endpoint = associatedEndpoint;
+        socket = socketWrapper;
+        if (socket == null) {
+            // Socket has been closed in another thread
+            throw new IOException(sm.getString("iib.socketClosed"));
+        }
+        socketReadBufferSize =
+            socket.getSocket().getBufHandler().getReadBuffer().capacity();
+
+        int bufLength = headerBufferSize + socketReadBufferSize;
+        if (buf == null || buf.length < bufLength) {
+            buf = new byte[bufLength];
+        }
+
+        // Initialize the completion handler
+        this.completionHandler = new CompletionHandler<Integer, SocketWrapper<Nio2Channel>>() {
+
+            @Override
+            public void completed(Integer nBytes, SocketWrapper<Nio2Channel> attachment) {
+                boolean notify = false;
+                synchronized (completionHandler) {
+                    if (nBytes < 0) {
+                        failed(new ClosedChannelException(), attachment);
+                        return;
+                    }
+                    readPending = false;
+                    if (!Nio2Endpoint.isInline()) {
+                        notify = true;
+                    }
+                }
+                if (notify) {
+                    endpoint.processSocket(attachment, SocketStatus.OPEN_READ, false);
+                }
+            }
+
+            @Override
+            public void failed(Throwable exc, SocketWrapper<Nio2Channel> attachment) {
+                attachment.setError(true);
+                if (exc instanceof IOException) {
+                    e = (IOException) exc;
+                } else {
+                    e = new IOException(exc);
+                }
+                request.setAttribute(RequestDispatcher.ERROR_EXCEPTION, e);
+                readPending = false;
+                endpoint.processSocket(attachment, SocketStatus.OPEN_READ, true);
+            }
+        };
+    }
+
+    @Override
+    protected boolean fill(boolean block) throws IOException, EOFException {
+        if (e != null) {
+            throw e;
+        }
+        if (parsingHeader) {
+            if (lastValid > headerBufferSize) {
+                throw new IllegalArgumentException
+                    (sm.getString("iib.requestheadertoolarge.error"));
+            }
+        } else {
+            lastValid = pos = end;
+        }
+        // Now fill the internal buffer
+        int nRead = 0;
+        ByteBuffer byteBuffer = socket.getSocket().getBufHandler().getReadBuffer();
+        if (block) {
+            if (!flipped) {
+                byteBuffer.flip();
+                flipped = true;
+            }
+            int nBytes = byteBuffer.remaining();
+            // This case can happen when a blocking read follows a non blocking
+            // fill that completed asynchronously
+            if (nBytes > 0) {
+                expand(nBytes + pos);
+                byteBuffer.get(buf, pos, nBytes);
+                lastValid = pos + nBytes;
+                byteBuffer.clear();
+                flipped = false;
+                return true;
+            } else {
+                byteBuffer.clear();
+                flipped = false;
+                try {
+                    nRead = socket.getSocket().read(byteBuffer).get(socket.getTimeout(), TimeUnit.MILLISECONDS);
+                } catch (InterruptedException | ExecutionException
+                        | TimeoutException e) {
+                    throw new EOFException(sm.getString("iib.eof.error"));
+                }
+                if (nRead > 0) {
+                    if (!flipped) {
+                        byteBuffer.flip();
+                        flipped = true;
+                    }
+                    expand(nRead + pos);
+                    byteBuffer.get(buf, pos, nRead);
+                    lastValid = pos + nRead;
+                    return true;
+                } else if (nRead == -1) {
+                    //return false;
+                    throw new EOFException(sm.getString("iib.eof.error"));
+                } else {
+                    return false;
+                }
+            }
+        } else {
+            synchronized (completionHandler) {
+                if (!readPending) {
+                    if (!flipped) {
+                        byteBuffer.flip();
+                        flipped = true;
+                    }
+                    int nBytes = byteBuffer.remaining();
+                    if (nBytes > 0) {
+                        expand(nBytes + pos);
+                        byteBuffer.get(buf, pos, nBytes);
+                        lastValid = pos + nBytes;
+                        byteBuffer.clear();
+                        flipped = false;
+                    } else {
+                        byteBuffer.clear();
+                        flipped = false;
+                        readPending = true;
+                        Nio2Endpoint.startInline();
+                        try {
+                            socket.getSocket().read(byteBuffer, socket.getTimeout(),
+                                    TimeUnit.MILLISECONDS, socket, completionHandler);
+                        } catch (ReadPendingException e) {
+                            // Ignore ?
+                        }
+                        Nio2Endpoint.endInline();
+                        // Return the number of bytes that have been placed into the buffer
+                        if (!readPending) {
+                            // If the completion handler completed immediately
+                            if (!flipped) {
+                                byteBuffer.flip();
+                                flipped = true;
+                            }
+                            nBytes = byteBuffer.remaining();
+                            if (nBytes > 0) {
+                                expand(nBytes + pos);
+                                byteBuffer.get(buf, pos, nBytes);
+                                lastValid = pos + nBytes;
+                            }
+                            byteBuffer.clear();
+                            flipped = false;
+                        }
+                    }
+                    return (lastValid - pos) > 0;
+                } else {
+                    return false;
+                }
+            }
+        }
+    }
+
+
+    // ------------------------------------- InputStreamInputBuffer Inner Class
+
+
+    /**
+     * This class is an input buffer which will read its data from an input
+     * stream.
+     */
+    protected class SocketInputBuffer
+        implements InputBuffer {
+
+
+        /**
+         * Read bytes into the specified chunk.
+         */
+        @Override
+        public int doRead(ByteChunk chunk, Request req )
+            throws IOException {
+
+            if (pos >= lastValid) {
+                if (!fill(true)) //read body, must be blocking, as the thread is inside the app
+                    return -1;
+            }
+            if (isBlocking()) {
+                int length = lastValid - pos;
+                chunk.setBytes(buf, pos, length);
+                pos = lastValid;
+                return (length);
+            } else {
+                synchronized (completionHandler) {
+                    int length = lastValid - pos;
+                    chunk.setBytes(buf, pos, length);
+                    pos = lastValid;
+                    return (length);
+                }
+            }
+        }
+    }
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org