You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by re...@apache.org on 2014/03/10 12:27:12 UTC

svn commit: r1575905 [3/4] - in /tomcat/trunk: ./ java/org/apache/coyote/ajp/ java/org/apache/coyote/http11/ java/org/apache/coyote/http11/upgrade/ java/org/apache/tomcat/util/net/ webapps/docs/ webapps/docs/config/

Added: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java?rev=1575905&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java (added)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Mon Mar 10 11:27:11 2014
@@ -0,0 +1,1251 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.tomcat.util.net;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.StandardSocketOptions;
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousChannelGroup;
+import java.nio.channels.AsynchronousServerSocketChannel;
+import java.nio.channels.AsynchronousSocketChannel;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.CompletionHandler;
+import java.nio.channels.FileChannel;
+import java.nio.channels.ReadPendingException;
+import java.nio.file.StandardOpenOption;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLSessionContext;
+import javax.net.ssl.X509KeyManager;
+
+import org.apache.juli.logging.Log;
+import org.apache.juli.logging.LogFactory;
+import org.apache.tomcat.util.ExceptionUtils;
+import org.apache.tomcat.util.collections.SynchronizedStack;
+import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
+import org.apache.tomcat.util.net.SecureNio2Channel.ApplicationBufferHandler;
+import org.apache.tomcat.util.net.jsse.NioX509KeyManager;
+
+/**
+ * NIO2 endpoint.
+ */
+public class Nio2Endpoint extends AbstractEndpoint<Nio2Channel> {
+
+
+    // -------------------------------------------------------------- Constants
+
+
+    private static final Log log = LogFactory.getLog(Nio2Endpoint.class);
+
+
+    public static final int OP_REGISTER = 0x100; //register interest op
+    public static final int OP_CALLBACK = 0x200; //callback interest op
+    public static final int OP_READ = 0x400; //read interest op
+    public static final int OP_WRITE = 0x800; //write interest op
+
+    // ----------------------------------------------------------------- Fields
+
+    /**
+     * Server socket "pointer".
+     */
+    private AsynchronousServerSocketChannel serverSock = null;
+
+    /**
+     * use send file
+     */
+    private boolean useSendfile = true;
+
+    /**
+     * The size of the OOM parachute.
+     */
+    private int oomParachute = 1024*1024;
+
+    /**
+     * Allows detecting if a completion handler completes inline.
+     */
+    private static ThreadLocal<Boolean> inlineCompletion = new ThreadLocal<>();
+
+    /**
+     * The oom parachute, when an OOM error happens,
+     * will release the data, giving the JVM instantly
+     * a chunk of data to be able to recover with.
+     */
+    private byte[] oomParachuteData = null;
+
+    /**
+     * Make sure this string has already been allocated
+     */
+    private static final String oomParachuteMsg =
+        "SEVERE:Memory usage is low, parachute is non existent, your system may start failing.";
+
+    /**
+     * Keep track of OOM warning messages.
+     */
+    private long lastParachuteCheck = System.currentTimeMillis();
+    
+    /**
+     * Cache for SocketProcessor objects
+     */
+    private final SynchronizedStack<SocketProcessor> processorCache =
+            new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
+                    socketProperties.getProcessorCache());
+
+    /**
+     * Cache for key attachment objects
+     */
+    private final SynchronizedStack<Nio2SocketWrapper> socketWrapperCache =
+            new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
+                    socketProperties.getSocketWrapperCache());
+
+    /**
+     * Bytebuffer cache, each channel holds a set of buffers (two, except for SSL holds four)
+     */
+    private final SynchronizedStack<Nio2Channel> nioChannels =
+            new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
+                    socketProperties.getBufferPoolSize());
+
+
+    // ------------------------------------------------------------- Properties
+
+
+    /**
+     * Use the object caches to reduce GC at the expense of additional memory use.
+     */
+    private boolean useCaches = false;
+    public void setUseCaches(boolean useCaches) { this.useCaches = useCaches; }
+    public boolean getUseCaches() { return useCaches; }
+
+
+    /**
+     * Priority of the poller threads.
+     */
+    private int pollerThreadPriority = Thread.NORM_PRIORITY;
+    public void setPollerThreadPriority(int pollerThreadPriority) { this.pollerThreadPriority = pollerThreadPriority; }
+    public int getPollerThreadPriority() { return pollerThreadPriority; }
+
+
+    /**
+     * Handling of accepted sockets.
+     */
+    private Handler handler = null;
+    public void setHandler(Handler handler ) { this.handler = handler; }
+    public Handler getHandler() { return handler; }
+
+
+    /**
+     * Allow comet request handling.
+     */
+    private boolean useComet = true;
+    public void setUseComet(boolean useComet) { this.useComet = useComet; }
+    @Override
+    public boolean getUseComet() { return useComet; }
+    @Override
+    public boolean getUseCometTimeout() { return getUseComet(); }
+    @Override
+    public boolean getUsePolling() { return true; } // Always supported
+
+    public void setSocketProperties(SocketProperties socketProperties) {
+        this.socketProperties = socketProperties;
+    }
+
+    public void setUseSendfile(boolean useSendfile) {
+        this.useSendfile = useSendfile;
+    }
+
+    /**
+     * Is deferAccept supported?
+     */
+    @Override
+    public boolean getDeferAccept() {
+        // Not supported
+        return false;
+    }
+
+    public void setOomParachute(int oomParachute) {
+        this.oomParachute = oomParachute;
+    }
+
+    public void setOomParachuteData(byte[] oomParachuteData) {
+        this.oomParachuteData = oomParachuteData;
+    }
+
+
+    private SSLContext sslContext = null;
+    public SSLContext getSSLContext() { return sslContext;}
+    public void setSSLContext(SSLContext c) { sslContext = c;}
+    private String[] enabledCiphers;
+    private String[] enabledProtocols;
+
+    /**
+     * Port in use.
+     */
+    @Override
+    public int getLocalPort() {
+        AsynchronousServerSocketChannel ssc = serverSock;
+        if (ssc == null) {
+            return -1;
+        } else {
+            try {
+                SocketAddress sa = ssc.getLocalAddress();
+                if (sa != null && sa instanceof InetSocketAddress) {
+                    return ((InetSocketAddress) sa).getPort();
+                } else {
+                    return -1;
+                }
+            } catch (IOException e) {
+                return -1;
+            }
+        }
+    }
+
+
+    @Override
+    public String[] getCiphersUsed() {
+        return enabledCiphers;
+    }
+
+
+    // --------------------------------------------------------- OOM Parachute Methods
+
+    protected void checkParachute() {
+        boolean para = reclaimParachute(false);
+        if (!para && (System.currentTimeMillis()-lastParachuteCheck)>10000) {
+            try {
+                log.fatal(oomParachuteMsg);
+            }catch (Throwable t) {
+                ExceptionUtils.handleThrowable(t);
+                System.err.println(oomParachuteMsg);
+            }
+            lastParachuteCheck = System.currentTimeMillis();
+        }
+    }
+
+    protected boolean reclaimParachute(boolean force) {
+        if ( oomParachuteData != null ) return true;
+        if ( oomParachute > 0 && ( force || (Runtime.getRuntime().freeMemory() > (oomParachute*2))) )
+            oomParachuteData = new byte[oomParachute];
+        return oomParachuteData != null;
+    }
+
+    protected void releaseCaches() {
+        if (useCaches) {
+            this.socketWrapperCache.clear();
+            this.nioChannels.clear();
+            this.processorCache.clear();
+        }
+        if ( handler != null ) handler.recycle();
+
+    }
+
+    // --------------------------------------------------------- Public Methods
+    
+    /**
+     * Number of keepalive sockets.
+     */
+    public int getKeepAliveCount() {
+        return 0;
+        // FIXME: would need some specific statistics gathering
+    }
+
+
+    // ----------------------------------------------- Public Lifecycle Methods
+
+
+    /**
+     * Initialize the endpoint.
+     */
+    @Override
+    public void bind() throws Exception {
+
+        // Create worker collection
+        if ( getExecutor() == null ) {
+            createExecutor();
+        }
+        AsynchronousChannelGroup threadGroup = null;
+        if (getExecutor() instanceof ExecutorService) {
+            threadGroup = AsynchronousChannelGroup.withThreadPool((ExecutorService) getExecutor());
+        }
+
+        serverSock = AsynchronousServerSocketChannel.open(threadGroup);
+        socketProperties.setProperties(serverSock);
+        InetSocketAddress addr = (getAddress()!=null?new InetSocketAddress(getAddress(),getPort()):new InetSocketAddress(getPort()));
+        serverSock.bind(addr,getBacklog());
+
+        // Initialize thread count defaults for acceptor, poller
+        if (acceptorThreadCount == 0) {
+            // NIO2 does not allow any form of IO concurrency
+            acceptorThreadCount = 1;
+        }
+
+        // Initialize SSL if needed
+        if (isSSLEnabled()) {
+            SSLUtil sslUtil = handler.getSslImplementation().getSSLUtil(this);
+
+            sslContext = sslUtil.createSSLContext();
+            sslContext.init(wrap(sslUtil.getKeyManagers()),
+                    sslUtil.getTrustManagers(), null);
+
+            SSLSessionContext sessionContext =
+                sslContext.getServerSessionContext();
+            if (sessionContext != null) {
+                sslUtil.configureSessionContext(sessionContext);
+            }
+            // Determine which cipher suites and protocols to enable
+            enabledCiphers = sslUtil.getEnableableCiphers(sslContext);
+            enabledProtocols = sslUtil.getEnableableProtocols(sslContext);
+        }
+
+        if (oomParachute>0) reclaimParachute(true);
+    }
+
+    public KeyManager[] wrap(KeyManager[] managers) {
+        if (managers==null) return null;
+        KeyManager[] result = new KeyManager[managers.length];
+        for (int i=0; i<result.length; i++) {
+            if (managers[i] instanceof X509KeyManager && getKeyAlias()!=null) {
+                result[i] = new NioX509KeyManager((X509KeyManager)managers[i],getKeyAlias());
+            } else {
+                result[i] = managers[i];
+            }
+        }
+        return result;
+    }
+
+
+    /**
+     * Start the NIO endpoint, creating acceptor, poller threads.
+     */
+    @Override
+    public void startInternal() throws Exception {
+
+        if (!running) {
+            running = true;
+            paused = false;
+
+            // FIXME: remove when more stable
+            log.warn("The NIO2 connector is currently EXPERIMENTAL and should not be used in production");
+
+            // Create worker collection
+            if ( getExecutor() == null ) {
+                createExecutor();
+            }
+
+            initializeConnectionLatch();
+            startAcceptorThreads();
+
+            // Start async timeout thread
+            Thread timeoutThread = new Thread(new AsyncTimeout(),
+                    getName() + "-AsyncTimeout");
+            timeoutThread.setPriority(threadPriority);
+            timeoutThread.setDaemon(true);
+            timeoutThread.start();
+        }
+    }
+
+
+    /**
+     * Stop the endpoint. This will cause all processing threads to stop.
+     */
+    @Override
+    public void stopInternal() {
+        releaseConnectionLatch();
+        if (!paused) {
+            pause();
+        }
+        if (running) {
+            running = false;
+            unlockAccept();
+        }
+        if (useCaches) {
+            socketWrapperCache.clear();
+            nioChannels.clear();
+            processorCache.clear();
+        }
+    }
+
+
+    /**
+     * Deallocate NIO memory pools, and close server socket.
+     */
+    @Override
+    public void unbind() throws Exception {
+        if (log.isDebugEnabled()) {
+            log.debug("Destroy initiated for "+new InetSocketAddress(getAddress(),getPort()));
+        }
+        if (running) {
+            stop();
+        }
+        // Close server socket
+        serverSock.close();
+        serverSock = null;
+        sslContext = null;
+        // Unlike other connectors, the thread pool is tied to the server socket
+        shutdownExecutor();
+        releaseCaches();
+        if (log.isDebugEnabled()) {
+            log.debug("Destroy completed for "+new InetSocketAddress(getAddress(),getPort()));
+        }
+    }
+
+
+    // ------------------------------------------------------ Protected Methods
+
+
+    public int getWriteBufSize() {
+        return socketProperties.getTxBufSize();
+    }
+
+    public int getReadBufSize() {
+        return socketProperties.getRxBufSize();
+    }
+
+    @Override
+    public boolean getUseSendfile() {
+        return useSendfile;
+    }
+
+    public int getOomParachute() {
+        return oomParachute;
+    }
+
+    public byte[] getOomParachuteData() {
+        return oomParachuteData;
+    }
+
+
+    @Override
+    protected AbstractEndpoint.Acceptor createAcceptor() {
+        return new Acceptor();
+    }
+
+
+    /**
+     * Process the specified connection.
+     */
+    protected boolean setSocketOptions(AsynchronousSocketChannel socket) {
+        // Process the connection
+        try {
+            socketProperties.setProperties(socket);
+
+            Nio2Channel channel = (useCaches) ? nioChannels.pop() : null;
+            if (channel == null) {
+                // SSL setup
+                if (sslContext != null) {
+                    SSLEngine engine = createSSLEngine();
+                    int appbufsize = engine.getSession().getApplicationBufferSize();
+                    NioBufferHandler bufhandler = new NioBufferHandler(Math.max(appbufsize,socketProperties.getAppReadBufSize()),
+                            socketProperties.getAppWriteBufSize(),
+                            socketProperties.getDirectBuffer());
+                    channel = new SecureNio2Channel(socket, engine, bufhandler, this);
+                } else {
+                    // normal tcp setup
+                    NioBufferHandler bufhandler = new NioBufferHandler(socketProperties.getAppReadBufSize(),
+                                                                       socketProperties.getAppWriteBufSize(),
+                                                                       socketProperties.getDirectBuffer());
+
+                    channel = new Nio2Channel(socket, bufhandler);
+                }
+            } else {
+                channel.setIOChannel(socket);
+                if ( channel instanceof SecureNio2Channel ) {
+                    SSLEngine engine = createSSLEngine();
+                    ((SecureNio2Channel)channel).reset(engine);
+                } else {
+                    channel.reset();
+                }
+            }
+            Nio2SocketWrapper socketWrapper = (useCaches) ? socketWrapperCache.pop() : null;
+            if (socketWrapper == null) {
+                socketWrapper = new Nio2SocketWrapper(channel);
+            }
+            socketWrapper.reset(channel, getSocketProperties().getSoTimeout());
+            socketWrapper.setKeepAliveLeft(Nio2Endpoint.this.getMaxKeepAliveRequests());
+            socketWrapper.setSecure(isSSLEnabled());
+            if (sslContext != null) {
+                ((SecureNio2Channel) channel).setSocket(socketWrapper);
+            }
+            processSocket(socketWrapper, SocketStatus.OPEN_READ, true);
+            // FIXME: In theory, awaitBytes is better, but the SSL handshake is done by processSocket
+            //awaitBytes(socketWrapper);
+        } catch (Throwable t) {
+            ExceptionUtils.handleThrowable(t);
+            try {
+                log.error("",t);
+            } catch (Throwable tt) {
+                ExceptionUtils.handleThrowable(t);
+            }
+            // Tell to close the socket
+            return false;
+        }
+        return true;
+    }
+
+    protected SSLEngine createSSLEngine() {
+        SSLEngine engine = sslContext.createSSLEngine();
+        if ("false".equals(getClientAuth())) {
+            engine.setNeedClientAuth(false);
+            engine.setWantClientAuth(false);
+        } else if ("true".equals(getClientAuth()) || "yes".equals(getClientAuth())){
+            engine.setNeedClientAuth(true);
+        } else if ("want".equals(getClientAuth())) {
+            engine.setWantClientAuth(true);
+        }
+        engine.setUseClientMode(false);
+        engine.setEnabledCipherSuites(enabledCiphers);
+        engine.setEnabledProtocols(enabledProtocols);
+
+        handler.onCreateSSLEngine(engine);
+        return engine;
+    }
+
+
+    /**
+     * Returns true if a worker thread is available for processing.
+     * @return boolean
+     */
+    protected boolean isWorkerAvailable() {
+        return true;
+    }
+
+
+    @Override
+    public void processSocket(SocketWrapper<Nio2Channel> socketWrapper,
+            SocketStatus socketStatus, boolean dispatch) {
+        processSocket0(socketWrapper, socketStatus, dispatch);
+    }
+
+    protected boolean processSocket0(SocketWrapper<Nio2Channel> socket, SocketStatus status, boolean dispatch) {
+        try {
+            ((Nio2SocketWrapper) socket).setCometNotify(false); //will get reset upon next reg
+            SocketProcessor sc = (useCaches) ? processorCache.pop() : null;
+            if (sc == null) {
+                sc = new SocketProcessor(socket, status);
+            } else {
+                sc.reset(socket, status);
+            }
+            Executor executor = getExecutor();
+            if (dispatch && executor != null) {
+                executor.execute(sc);
+            } else {
+                sc.run();
+            }
+        } catch (RejectedExecutionException ree) {
+            log.warn(sm.getString("endpoint.executor.fail", socket), ree);
+            return false;
+        } catch (Throwable t) {
+            ExceptionUtils.handleThrowable(t);
+            // This means we got an OOM or similar creating a thread, or that
+            // the pool and its queue are full
+            log.error(sm.getString("endpoint.process.fail"), t);
+            return false;
+        }
+        return true;
+    }
+
+    public void closeSocket(SocketWrapper<Nio2Channel> socket, SocketStatus status) {
+        try {
+            Nio2SocketWrapper ka = (Nio2SocketWrapper) socket;
+            if (socket != null && socket.isComet() && status != null) {
+                socket.setComet(false);//to avoid a loop
+                if (status == SocketStatus.TIMEOUT ) {
+                    if (processSocket0(socket, status, true)) {
+                        return; // don't close on comet timeout
+                    }
+                } else {
+                    // Don't dispatch if the lines below are canceling the key
+                    processSocket0(socket, status, false);
+                }
+            }
+            if (socket!=null) handler.release(socket);
+            try {
+                if (socket!=null) {
+                    socket.getSocket().close(true);
+                }
+            } catch (Exception e){
+                if (log.isDebugEnabled()) {
+                    log.debug(sm.getString(
+                            "endpoint.debug.socketCloseFail"), e);
+                }
+            }
+            try {
+                if (ka != null && ka.getSendfileData() != null
+                        && ka.getSendfileData().fchannel != null
+                        && ka.getSendfileData().fchannel.isOpen()) {
+                    ka.getSendfileData().fchannel.close();
+                }
+            } catch (Exception ignore) {
+            }
+            if (ka!=null) {
+                ka.reset();
+                countDownConnection();
+            }
+        } catch (Throwable e) {
+            ExceptionUtils.handleThrowable(e);
+            if (log.isDebugEnabled()) log.error("",e);
+        }
+    }
+
+    @Override
+    protected Log getLog() {
+        return log;
+    }
+
+
+    // --------------------------------------------------- Acceptor Inner Class
+
+    /**
+     * With NIO2, the main acceptor thread only initiates the initial accept
+     * but periodically checks that the connector is still accepting (if not
+     * it will attempt to start again). It is also responsible for periodic
+     * checks of async timeouts, rather than use a dedicated thread for that.
+     */
+    protected class Acceptor extends AbstractEndpoint.Acceptor {
+
+        @Override
+        public void run() {
+
+            int errorDelay = 0;
+
+            // Loop until we receive a shutdown command
+            while (running) {
+
+                // Loop if endpoint is paused
+                while (paused && running) {
+                    state = AcceptorState.PAUSED;
+                    try {
+                        Thread.sleep(50);
+                    } catch (InterruptedException e) {
+                        // Ignore
+                    }
+                }
+
+                if (!running) {
+                    break;
+                }
+                state = AcceptorState.RUNNING;
+
+                try {
+                    //if we have reached max connections, wait
+                    countUpOrAwaitConnection();
+
+                    AsynchronousSocketChannel socket = null;
+                    try {
+                        // Accept the next incoming connection from the server
+                        // socket
+                        socket = serverSock.accept().get();
+                    } catch (Exception e) {
+                        countDownConnection();
+                        // Introduce delay if necessary
+                        errorDelay = handleExceptionWithDelay(errorDelay);
+                        // re-throw
+                        throw e;
+                    }
+                    // Successful accept, reset the error delay
+                    errorDelay = 0;
+
+                    // Configure the socket
+                    if (running && !paused) {
+                        // Hand this socket off to an appropriate processor
+                        if (!setSocketOptions(socket)) {
+                            countDownConnection();
+                            closeSocket(socket);
+                        }
+                    } else {
+                        countDownConnection();
+                        // Close socket right away
+                        closeSocket(socket);
+                    }
+                } catch (NullPointerException npe) {
+                    if (running) {
+                        log.error(sm.getString("endpoint.accept.fail"), npe);
+                    }
+                } catch (Throwable t) {
+                    ExceptionUtils.handleThrowable(t);
+                    log.error(sm.getString("endpoint.accept.fail"), t);
+                }
+            }
+            state = AcceptorState.ENDED;
+        }
+
+    }
+
+    /**
+     * Async timeout thread
+     */
+    protected class AsyncTimeout implements Runnable {
+        /**
+         * The background thread that checks async requests and fires the
+         * timeout if there has been no activity.
+         */
+        @Override
+        public void run() {
+
+            // Loop until we receive a shutdown command
+            while (running) {
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    // Ignore
+                }
+                long now = System.currentTimeMillis();
+                Iterator<SocketWrapper<Nio2Channel>> sockets =
+                    waitingRequests.keySet().iterator();
+                while (sockets.hasNext()) {
+                    SocketWrapper<Nio2Channel> socket = sockets.next();
+                    long access = socket.getLastAccess();
+                    if (socket.getTimeout() > 0 &&
+                            (now-access)>socket.getTimeout()) {
+                        processSocket(socket, SocketStatus.TIMEOUT, true);
+                    }
+                }
+
+                // Loop if endpoint is paused
+                while (paused && running) {
+                    try {
+                        Thread.sleep(1000);
+                    } catch (InterruptedException e) {
+                        // Ignore
+                    }
+                }
+
+            }
+        }
+    }
+
+
+    private void closeSocket(AsynchronousSocketChannel socket) {
+        try {
+            socket.close();
+        } catch (IOException ioe) {
+            if (log.isDebugEnabled()) {
+                log.debug("", ioe);
+            }
+        }
+    }
+
+
+    public static class Nio2SocketWrapper extends SocketWrapper<Nio2Channel> {
+
+        public Nio2SocketWrapper(Nio2Channel channel) {
+            super(channel);
+        }
+
+        public void reset(Nio2Channel channel, long soTimeout) {
+            super.reset(channel, soTimeout);
+            upgradeInit = false;
+            cometNotify = false;
+            interestOps = 0;
+            sendfileData = null;
+            if (readLatch != null) {
+                try {
+                    for (int i = 0; i < (int) readLatch.getCount(); i++) {
+                        readLatch.countDown();
+                    }
+                } catch (Exception ignore) {
+                }
+            }
+            readLatch = null;
+            sendfileData = null;
+            if (writeLatch != null) {
+                try {
+                    for (int i = 0; i < (int) writeLatch.getCount(); i++) {
+                        writeLatch.countDown();
+                    }
+                } catch (Exception ignore) {
+                }
+            }
+            writeLatch = null;
+            setWriteTimeout(soTimeout);
+        }
+
+        public void reset() {
+            reset(null, -1);
+        }
+
+        @Override
+        public long getTimeout() {
+            long timeout = super.getTimeout();
+            return (timeout > 0) ? timeout : Long.MAX_VALUE;
+        }
+        public void setUpgraded(boolean upgraded) {
+            if (upgraded && !isUpgraded()) {
+                upgradeInit = true;
+            }
+            super.setUpgraded(upgraded);
+        }
+        public boolean isUpgradeInit() {
+            boolean value = upgradeInit;
+            upgradeInit = false;
+            return value;
+        }
+        public void setCometNotify(boolean notify) { this.cometNotify = notify; }
+        public boolean getCometNotify() { return cometNotify; }
+        public Nio2Channel getChannel() { return getSocket();}
+        public int interestOps() { return interestOps;}
+        public int interestOps(int ops) { this.interestOps  = ops; return ops; }
+        public CountDownLatch getReadLatch() { return readLatch; }
+        public CountDownLatch getWriteLatch() { return writeLatch; }
+        protected CountDownLatch resetLatch(CountDownLatch latch) {
+            if ( latch==null || latch.getCount() == 0 ) return null;
+            else throw new IllegalStateException("Latch must be at count 0");
+        }
+        public void resetReadLatch() { readLatch = resetLatch(readLatch); }
+        public void resetWriteLatch() { writeLatch = resetLatch(writeLatch); }
+
+        protected CountDownLatch startLatch(CountDownLatch latch, int cnt) {
+            if ( latch == null || latch.getCount() == 0 ) {
+                return new CountDownLatch(cnt);
+            }
+            else throw new IllegalStateException("Latch must be at count 0 or null.");
+        }
+        public void startReadLatch(int cnt) { readLatch = startLatch(readLatch,cnt);}
+        public void startWriteLatch(int cnt) { writeLatch = startLatch(writeLatch,cnt);}
+
+        protected void awaitLatch(CountDownLatch latch, long timeout, TimeUnit unit) throws InterruptedException {
+            if ( latch == null ) throw new IllegalStateException("Latch cannot be null");
+            // Note: While the return value is ignored if the latch does time
+            //       out, logic further up the call stack will trigger a
+            //       SocketTimeoutException
+            latch.await(timeout,unit);
+        }
+        public void awaitReadLatch(long timeout, TimeUnit unit) throws InterruptedException { awaitLatch(readLatch,timeout,unit);}
+        public void awaitWriteLatch(long timeout, TimeUnit unit) throws InterruptedException { awaitLatch(writeLatch,timeout,unit);}
+
+        public void setSendfileData(SendfileData sf) { this.sendfileData = sf;}
+        public SendfileData getSendfileData() { return this.sendfileData;}
+
+        public void setWriteTimeout(long writeTimeout) {
+            if (writeTimeout <= 0) {
+                this.writeTimeout = Long.MAX_VALUE;
+            } else {
+                this.writeTimeout = writeTimeout;
+            }
+        }
+        public long getWriteTimeout() {return this.writeTimeout;}
+
+        private int interestOps = 0;
+        private boolean cometNotify = false;
+        private CountDownLatch readLatch = null;
+        private CountDownLatch writeLatch = null;
+        private SendfileData sendfileData = null;
+        private long writeTimeout = -1;
+        private boolean upgradeInit = false;
+
+    }
+
+    // ------------------------------------------------ Application Buffer Handler
+    public static class NioBufferHandler implements ApplicationBufferHandler {
+        private ByteBuffer readbuf = null;
+        private ByteBuffer writebuf = null;
+
+        public NioBufferHandler(int readsize, int writesize, boolean direct) {
+            if ( direct ) {
+                readbuf = ByteBuffer.allocateDirect(readsize);
+                writebuf = ByteBuffer.allocateDirect(writesize);
+            }else {
+                readbuf = ByteBuffer.allocate(readsize);
+                writebuf = ByteBuffer.allocate(writesize);
+            }
+        }
+
+        @Override
+        public ByteBuffer expand(ByteBuffer buffer, int remaining) {return buffer;}
+        @Override
+        public ByteBuffer getReadBuffer() {return readbuf;}
+        @Override
+        public ByteBuffer getWriteBuffer() {return writebuf;}
+
+    }
+
+    // ------------------------------------------------ Handler Inner Interface
+
+
+    /**
+     * Bare bones interface used for socket processing. Per thread data is to be
+     * stored in the ThreadWithAttributes extra folders, or alternately in
+     * thread local fields.
+     */
+    public interface Handler extends AbstractEndpoint.Handler {
+        public SocketState process(SocketWrapper<Nio2Channel> socket,
+                SocketStatus status);
+        public void release(SocketWrapper<Nio2Channel> socket);
+        public SSLImplementation getSslImplementation();
+        public void onCreateSSLEngine(SSLEngine engine);
+    }
+
+    protected ConcurrentHashMap<SocketWrapper<Nio2Channel>, SocketWrapper<Nio2Channel>> waitingRequests =
+            new ConcurrentHashMap<>();
+
+    /**
+     * The completion handler used for asynchronous read operations
+     */
+    private CompletionHandler<Integer, SocketWrapper<Nio2Channel>> awaitBytes
+            = new CompletionHandler<Integer, SocketWrapper<Nio2Channel>>() {
+
+        @Override
+        public synchronized void completed(Integer nBytes, SocketWrapper<Nio2Channel> attachment) {
+            if (nBytes < 0) {
+                failed(new ClosedChannelException(), attachment);
+                return;
+            }
+            processSocket0(attachment, SocketStatus.OPEN_READ, true);
+        }
+
+        @Override
+        public void failed(Throwable exc, SocketWrapper<Nio2Channel> attachment) {
+            processSocket0(attachment, SocketStatus.DISCONNECT, true);
+        }
+    };
+
+    public void addTimeout(SocketWrapper<Nio2Channel> socket) {
+        waitingRequests.put(socket, socket);
+    }
+
+    public boolean removeTimeout(SocketWrapper<Nio2Channel> socket) {
+        return waitingRequests.remove(socket) != null;
+    }
+
+    public static void startInline() {
+        inlineCompletion.set(Boolean.TRUE);
+    }
+
+    public static void endInline() {
+        inlineCompletion.set(Boolean.FALSE);
+    }
+
+    public static boolean isInline() {
+        Boolean flag = inlineCompletion.get();
+        if (flag == null) {
+            return false;
+        } else {
+            return flag.booleanValue();
+        }
+    }
+
+    public void awaitBytes(SocketWrapper<Nio2Channel> socket) {
+        if (socket == null || socket.getSocket() == null) {
+            return;
+        }
+        ByteBuffer byteBuffer = socket.getSocket().getBufHandler().getReadBuffer();
+        byteBuffer.clear();
+        try {
+            socket.getSocket().read(byteBuffer, socket.getTimeout(),
+                    TimeUnit.MILLISECONDS, socket, awaitBytes);
+        } catch (ReadPendingException e) {
+            // Ignore
+        }
+    }
+
+    public boolean processSendfile(final Nio2SocketWrapper socket) {
+
+        // Configure the send file data
+        SendfileData data = socket.getSendfileData();
+        if (data.fchannel == null || !data.fchannel.isOpen()) {
+            java.nio.file.Path path = new File(data.fileName).toPath();
+            try {
+                data.fchannel = java.nio.channels.FileChannel
+                        .open(path, StandardOpenOption.READ).position(data.pos);
+            } catch (IOException e) {
+                closeSocket(socket, SocketStatus.ERROR);
+                return false;
+            }
+        }
+
+        final ByteBuffer buffer;
+        if (!socketProperties.getDirectBuffer() && sslContext == null) {
+            // If not using SSL and direct buffers are not used, the
+            // idea of sendfile is to avoid memory copies, so allocate a
+            // direct buffer
+            int BUFFER_SIZE;
+            try {
+                BUFFER_SIZE = socket.getSocket().getIOChannel().getOption(StandardSocketOptions.SO_SNDBUF);
+            } catch (IOException e) {
+                BUFFER_SIZE = 8192;
+            }
+            buffer = ByteBuffer.allocateDirect(BUFFER_SIZE);
+        } else {
+            buffer = socket.getSocket().getBufHandler().getWriteBuffer();
+        }
+        int nr = -1;
+        try {
+            nr = data.fchannel.read(buffer);
+        } catch (IOException e1) {
+            closeSocket(socket, SocketStatus.ERROR);
+            return false;
+        }
+
+        if (nr >= 0) {
+            socket.getSocket().setSendFile(true);
+            buffer.flip();
+            socket.getSocket().write(buffer, data, new CompletionHandler<Integer, SendfileData>() {
+
+                @Override
+                public void completed(Integer nw, SendfileData attachment) {
+                    if (nw < 0) { // Reach the end of stream
+                        closeSocket(socket, SocketStatus.DISCONNECT);
+                        try {
+                            attachment.fchannel.close();
+                        } catch (IOException e) {
+                            // Ignore
+                        }
+                        return;
+                    }
+
+                    attachment.pos += nw;
+                    attachment.length -= nw;
+
+                    if (attachment.length <= 0) {
+                        socket.setSendfileData(null);
+                        try {
+                            attachment.fchannel.close();
+                        } catch (IOException e) {
+                            // Ignore
+                        }
+                        if (attachment.keepAlive) {
+                            socket.getSocket().setSendFile(false);
+                            awaitBytes(socket);
+                        } else {
+                            closeSocket(socket, SocketStatus.DISCONNECT);
+                        }
+                        return;
+                    }
+
+                    boolean ok = true;
+
+                    if (!buffer.hasRemaining()) {
+                        // This means that all data in the buffer has
+                        // been
+                        // written => Empty the buffer and read again
+                        buffer.clear();
+                        try {
+                            if (attachment.fchannel.read(buffer) >= 0) {
+                                buffer.flip();
+                                if (attachment.length < buffer.remaining()) {
+                                    buffer.limit(buffer.limit() - buffer.remaining() + (int) attachment.length);
+                                }
+                            } else {
+                                // Reach the EOF
+                                ok = false;
+                            }
+                        } catch (Throwable th) {
+                            if ( log.isDebugEnabled() ) log.debug("Unable to complete sendfile request:", th);
+                            ok = false;
+                        }
+                    }
+
+                    if (ok) {
+                        socket.getSocket().write(buffer, attachment, this);
+                    } else {
+                        try {
+                            attachment.fchannel.close();
+                        } catch (IOException e) {
+                            // Ignore
+                        }
+                        closeSocket(socket, SocketStatus.ERROR);
+                    }
+                }
+
+                @Override
+                public void failed(Throwable exc, SendfileData attachment) {
+                    // Closing channels
+                    closeSocket(socket, SocketStatus.ERROR);
+                    try {
+                        attachment.fchannel.close();
+                    } catch (IOException e) {
+                        // Ignore
+                    }
+                }
+            });
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    // ---------------------------------------------- SocketProcessor Inner Class
+    /**
+     * This class is the equivalent of the Worker, but will simply use in an
+     * external Executor thread pool.
+     */
+    protected class SocketProcessor implements Runnable {
+
+        private SocketWrapper<Nio2Channel> socket = null;
+        private SocketStatus status = null;
+
+        public SocketProcessor(SocketWrapper<Nio2Channel> socket, SocketStatus status) {
+            reset(socket,status);
+        }
+
+        public void reset(SocketWrapper<Nio2Channel> socket, SocketStatus status) {
+            this.socket = socket;
+            this.status = status;
+        }
+
+        @Override
+        public void run() {
+            // Upgraded connections need to allow multiple threads to access the
+            // connection at the same time to enable blocking IO to be used when
+            // NIO has been configured
+            if (socket != null && socket.isUpgraded() &&
+                    SocketStatus.OPEN_WRITE == status) {
+                synchronized (socket.getWriteThreadLock()) {
+                    doRun();
+                }
+            } else {
+                synchronized (socket) {
+                    doRun();
+                }
+            }
+        }
+
+        private void doRun() {
+            boolean launch = false;
+            try {
+                int handshake = -1;
+
+                try {
+                    if (socket != null && socket.getSocket() != null) {
+                        // For STOP there is no point trying to handshake as the
+                        // Poller has been stopped.
+                        if (socket.getSocket().isHandshakeComplete() ||
+                                status == SocketStatus.STOP) {
+                            handshake = 0;
+                        } else {
+                            handshake = socket.getSocket().handshake();
+                            // The handshake process reads/writes from/to the
+                            // socket. status may therefore be OPEN_WRITE once
+                            // the handshake completes. However, the handshake
+                            // happens when the socket is opened so the status
+                            // must always be OPEN_READ after it completes. It
+                            // is OK to always set this as it is only used if
+                            // the handshake completes.
+                            status = SocketStatus.OPEN_READ;
+                        }
+                    }
+                } catch (IOException x) {
+                    handshake = -1;
+                    if (log.isDebugEnabled()) log.debug("Error during SSL handshake",x);
+                }
+                if (handshake == 0) {
+                    SocketState state = SocketState.OPEN;
+                    // Process the request from this socket
+                    if (status == null) {
+                        state = handler.process(socket, SocketStatus.OPEN_READ);
+                    } else {
+                        state = handler.process(socket, status);
+                    }
+                    if (state == SocketState.CLOSED) {
+                        // Close socket and pool
+                        try {
+                            socket.setComet(false);
+                            closeSocket(socket, SocketStatus.ERROR);
+                            if (useCaches && running && !paused) {
+                                nioChannels.push(socket.getSocket());
+                            }
+                            if (useCaches && running && !paused && socket != null) {
+                                socketWrapperCache.push((Nio2SocketWrapper) socket);
+                            }
+                        } catch (Exception x) {
+                            log.error("",x);
+                        }
+                    } else if (state == SocketState.UPGRADING) {
+                        socket.setKeptAlive(true);
+                        socket.access();
+                        launch = true;
+                    }
+                } else if (handshake == -1 ) {
+                    if (socket != null) {
+                        closeSocket(socket, SocketStatus.DISCONNECT);
+                    }
+                    if (useCaches && running && !paused) {
+                        nioChannels.push(socket.getSocket());
+                    }
+                    if (useCaches && running && !paused && socket != null) {
+                        socketWrapperCache.push(((Nio2SocketWrapper) socket));
+                    }
+                }
+            } catch (OutOfMemoryError oom) {
+                try {
+                    oomParachuteData = null;
+                    log.error("", oom);
+                    if (socket != null) {
+                        closeSocket(socket, SocketStatus.ERROR);
+                    }
+                    releaseCaches();
+                } catch (Throwable oomt) {
+                    try {
+                        System.err.println(oomParachuteMsg);
+                        oomt.printStackTrace();
+                    } catch (Throwable letsHopeWeDontGetHere){
+                        ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
+                    }
+                }
+            } catch (VirtualMachineError vme) {
+                ExceptionUtils.handleThrowable(vme);
+            } catch (Throwable t) {
+                log.error("", t);
+                if (socket != null) {
+                    closeSocket(socket, SocketStatus.ERROR);
+                }
+            } finally {
+                if (launch) {
+                    try {
+                        getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN_READ));
+                    } catch (NullPointerException npe) {
+                        if (running) {
+                            log.error(sm.getString("endpoint.launch.fail"),
+                                    npe);
+                        }
+                    }
+                }
+                socket = null;
+                status = null;
+                //return to cache
+                if (useCaches && running && !paused) {
+                    processorCache.push(this);
+                }
+            }
+        }
+    }
+
+    // ----------------------------------------------- SendfileData Inner Class
+    /**
+     * SendfileData class.
+     */
+    public static class SendfileData {
+        // File
+        public String fileName;
+        public FileChannel fchannel;
+        public long pos;
+        public long length;
+        // KeepAlive flag
+        public boolean keepAlive;
+    }
+}

Added: tomcat/trunk/java/org/apache/tomcat/util/net/SecureNio2Channel.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/SecureNio2Channel.java?rev=1575905&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/SecureNio2Channel.java (added)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/SecureNio2Channel.java Mon Mar 10 11:27:11 2014
@@ -0,0 +1,911 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.util.net;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousSocketChannel;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.CompletionHandler;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLEngineResult.HandshakeStatus;
+import javax.net.ssl.SSLEngineResult.Status;
+import javax.net.ssl.SSLException;
+
+/**
+ * Implementation of a secure socket channel for NIO2.
+ */
+public class SecureNio2Channel extends Nio2Channel  {
+
+    protected ByteBuffer netInBuffer;
+    protected ByteBuffer netOutBuffer;
+
+    protected SSLEngine sslEngine;
+    protected final Nio2Endpoint endpoint;
+    protected SocketWrapper<Nio2Channel> socket;
+
+    protected boolean handshakeComplete = false;
+    protected HandshakeStatus handshakeStatus; //gets set by handshake
+
+    protected boolean closed = false;
+    protected boolean closing = false;
+    protected boolean readPending = false;
+    protected boolean writePending = false;
+
+    private CompletionHandler<Integer, SocketWrapper<Nio2Channel>> handshakeReadCompletionHandler;
+    private CompletionHandler<Integer, SocketWrapper<Nio2Channel>> handshakeWriteCompletionHandler;
+
+    public SecureNio2Channel(AsynchronousSocketChannel channel, SSLEngine engine,
+            ApplicationBufferHandler bufHandler, Nio2Endpoint endpoint0) throws IOException {
+        super(channel, bufHandler);
+        this.sslEngine = engine;
+        this.endpoint = endpoint0;
+        int appBufSize = sslEngine.getSession().getApplicationBufferSize();
+        int netBufSize = sslEngine.getSession().getPacketBufferSize();
+        //allocate network buffers - TODO, add in optional direct non-direct buffers
+        netInBuffer = ByteBuffer.allocateDirect(netBufSize);
+        netOutBuffer = ByteBuffer.allocateDirect(netBufSize);
+
+        handshakeReadCompletionHandler = new CompletionHandler<Integer, SocketWrapper<Nio2Channel>>() {
+            @Override
+            public void completed(Integer result, SocketWrapper<Nio2Channel> attachment) {
+                if (result < 0) {
+                    failed(new IOException("Error"), attachment);
+                    return;
+                }
+                endpoint.processSocket(attachment, SocketStatus.OPEN_READ, false);
+            }
+            @Override
+            public void failed(Throwable exc, SocketWrapper<Nio2Channel> attachment) {
+                endpoint.closeSocket(attachment, SocketStatus.ERROR);
+            }
+        };
+        handshakeWriteCompletionHandler = new CompletionHandler<Integer, SocketWrapper<Nio2Channel>>() {
+            @Override
+            public void completed(Integer result, SocketWrapper<Nio2Channel> attachment) {
+                if (result < 0) {
+                    failed(new IOException("Error"), attachment);
+                    return;
+                }
+                endpoint.processSocket(attachment, SocketStatus.OPEN_WRITE, false);
+            }
+            @Override
+            public void failed(Throwable exc, SocketWrapper<Nio2Channel> attachment) {
+                endpoint.closeSocket(attachment, SocketStatus.ERROR);
+            }
+        };
+
+        //ensure that the application has a large enough read/write buffers
+        //by doing this, we should not encounter any buffer overflow errors
+        // FIXME: this does nothing, so it is in the NIO2 endpoint
+        bufHandler.expand(bufHandler.getReadBuffer(), appBufSize);
+        reset();
+    }
+
+    void setSocket(SocketWrapper<Nio2Channel> socket) {
+        this.socket = socket;
+    }
+
+    public void reset(SSLEngine engine) throws IOException {
+        this.sslEngine = engine;
+        reset();
+    }
+
+    @Override
+    public void reset() throws IOException {
+        super.reset();
+        netOutBuffer.position(0);
+        netOutBuffer.limit(0);
+        netInBuffer.position(0);
+        netInBuffer.limit(0);
+        handshakeComplete = false;
+        closed = false;
+        closing = false;
+        readPending = false;
+        writePending = false;
+        //initiate handshake
+        sslEngine.beginHandshake();
+        handshakeStatus = sslEngine.getHandshakeStatus();
+    }
+
+    @Override
+    public int getBufferSize() {
+        int size = super.getBufferSize();
+        size += netInBuffer!=null?netInBuffer.capacity():0;
+        size += netOutBuffer!=null?netOutBuffer.capacity():0;
+        return size;
+    }
+
+    private class FutureFlush implements Future<Boolean> {
+        private Future<Integer> integer;
+        protected FutureFlush(Future<Integer> integer) {
+            this.integer = integer;
+        }
+        @Override
+        public boolean cancel(boolean mayInterruptIfRunning) {
+            return integer.cancel(mayInterruptIfRunning);
+        }
+        @Override
+        public boolean isCancelled() {
+            return integer.isCancelled();
+        }
+        @Override
+        public boolean isDone() {
+            return integer.isDone();
+        }
+        @Override
+        public Boolean get() throws InterruptedException,
+                ExecutionException {
+            int result = integer.get();
+            return result >= 0;
+        }
+        @Override
+        public Boolean get(long timeout, TimeUnit unit)
+                throws InterruptedException, ExecutionException,
+                TimeoutException {
+            int result = integer.get(timeout, unit);
+            return result >= 0;
+        }
+    };
+
+    /**
+     * Flush the channel.
+     *
+     * @return <code>true</code> if the network buffer has been flushed out and
+     *         is empty else <code>false</code> (as a future)
+     * @throws IOException
+     */
+    @Override
+    public Future<Boolean> flush()
+            throws IOException {
+        return new FutureFlush(sc.write(netOutBuffer));
+    }
+
+    /**
+     * Performs SSL handshake, non blocking, but performs NEED_TASK on the same thread.<br>
+     * Hence, you should never call this method using your Acceptor thread, as you would slow down
+     * your system significantly.<br>
+     * The return for this operation is 0 if the handshake is complete and a positive value if it is not complete.
+     * In the event of a positive value coming back, reregister the selection key for the return values interestOps.
+     *
+     * @return int - 0 if hand shake is complete, otherwise it returns a SelectionKey interestOps value
+     * @throws IOException
+     */
+    @Override
+    public int handshake() throws IOException {
+        return handshakeInternal(true);
+    }
+
+    protected int handshakeInternal(boolean async) throws IOException {
+        if (handshakeComplete)
+            return 0; //we have done our initial handshake
+
+        SSLEngineResult handshake = null;
+
+        while (!handshakeComplete) {
+            switch (handshakeStatus) {
+                case NOT_HANDSHAKING: {
+                    //should never happen
+                    throw new IOException("NOT_HANDSHAKING during handshake");
+                }
+                case FINISHED: {
+                    //we are complete if we have delivered the last package
+                    handshakeComplete = !netOutBuffer.hasRemaining();
+                    //return 0 if we are complete, otherwise we still have data to write
+                    if (handshakeComplete) {
+                        return 0;
+                    } else {
+                        if (async) {
+                            sc.write(netOutBuffer, socket, handshakeWriteCompletionHandler);
+                        } else {
+                            try {
+                                sc.write(netOutBuffer).get(endpoint.getSoTimeout(), TimeUnit.MILLISECONDS);
+                            } catch (InterruptedException | ExecutionException | TimeoutException e) {
+                                throw new IOException("Handshake error");
+                            }
+                        }
+                        return Nio2Endpoint.OP_WRITE;
+                    }
+                }
+                case NEED_WRAP: {
+                    //perform the wrap function
+                    handshake = handshakeWrap();
+                    if (handshake.getStatus() == Status.OK){
+                        if (handshakeStatus == HandshakeStatus.NEED_TASK)
+                            handshakeStatus = tasks();
+                    } else {
+                        //wrap should always work with our buffers
+                        throw new IOException("Unexpected status:" + handshake.getStatus() + " during handshake WRAP.");
+                    }
+                    if (handshakeStatus != HandshakeStatus.NEED_UNWRAP || netOutBuffer.remaining() > 0) {
+                        //should actually return OP_READ if we have NEED_UNWRAP
+                        if (async) {
+                            sc.write(netOutBuffer, socket, handshakeWriteCompletionHandler);
+                        } else {
+                            try {
+                                sc.write(netOutBuffer).get(endpoint.getSoTimeout(), TimeUnit.MILLISECONDS);
+                            } catch (InterruptedException | ExecutionException | TimeoutException e) {
+                                throw new IOException("Handshake error");
+                            }
+                        }
+                        return Nio2Endpoint.OP_WRITE;
+                    }
+                    //fall down to NEED_UNWRAP on the same call, will result in a
+                    //BUFFER_UNDERFLOW if it needs data
+                }
+                //$FALL-THROUGH$
+                case NEED_UNWRAP: {
+                    //perform the unwrap function
+                    handshake = handshakeUnwrap();
+                    if (handshake.getStatus() == Status.OK) {
+                        if (handshakeStatus == HandshakeStatus.NEED_TASK)
+                            handshakeStatus = tasks();
+                    } else if (handshake.getStatus() == Status.BUFFER_UNDERFLOW) {
+                        //read more data, reregister for OP_READ
+                        if (async) {
+                            sc.read(netInBuffer, socket, handshakeReadCompletionHandler);
+                        } else {
+                            try {
+                                sc.read(netInBuffer).get(endpoint.getSoTimeout(), TimeUnit.MILLISECONDS);
+                            } catch (InterruptedException | ExecutionException | TimeoutException e) {
+                                throw new IOException("Handshake error");
+                            }
+                        }
+                        return Nio2Endpoint.OP_READ;
+                    } else {
+                        throw new IOException("Invalid handshake status:"+handshakeStatus+" during handshake UNWRAP.");
+                    }
+                    break;
+                }
+                case NEED_TASK: {
+                    handshakeStatus = tasks();
+                    break;
+                }
+                default: throw new IllegalStateException("Invalid handshake status:"+handshakeStatus);
+            }
+        }
+        //return 0 if we are complete, otherwise recurse to process the task
+        return handshakeComplete ? 0 : handshakeInternal(async);
+    }
+
+    /**
+     * Force a blocking handshake to take place for this key.
+     * This requires that both network and application buffers have been emptied out prior to this call taking place, or a
+     * IOException will be thrown.
+     * @throws IOException - if an IO exception occurs or if application or network buffers contain data
+     * @throws SocketTimeoutException - if a socket operation timed out
+     */
+    public void rehandshake() throws IOException {
+        //validate the network buffers are empty
+        if (netInBuffer.position() > 0 && netInBuffer.position()<netInBuffer.limit()) throw new IOException("Network input buffer still contains data. Handshake will fail.");
+        if (netOutBuffer.position() > 0 && netOutBuffer.position()<netOutBuffer.limit()) throw new IOException("Network output buffer still contains data. Handshake will fail.");
+        if (getBufHandler().getReadBuffer().position()>0 && getBufHandler().getReadBuffer().position()<getBufHandler().getReadBuffer().limit()) throw new IOException("Application input buffer still contains data. Data would have been lost.");
+        if (getBufHandler().getWriteBuffer().position()>0 && getBufHandler().getWriteBuffer().position()<getBufHandler().getWriteBuffer().limit()) throw new IOException("Application output buffer still contains data. Data would have been lost.");
+        reset();
+        boolean handshaking = true;
+        try {
+            while (handshaking) {
+                int hsStatus = handshakeInternal(false);
+                switch (hsStatus) {
+                    case -1 : throw new EOFException("EOF during handshake.");
+                    case  0 : handshaking = false; break;
+                    default : // Some blocking IO occurred, so iterate
+                }
+            }
+        } catch (IOException x) {
+            throw x;
+        } catch (Exception cx) {
+            IOException x = new IOException(cx);
+            throw x;
+        }
+    }
+
+
+
+    /**
+     * Executes all the tasks needed on the same thread.
+     * @return HandshakeStatus
+     */
+    protected SSLEngineResult.HandshakeStatus tasks() {
+        Runnable r = null;
+        while ( (r = sslEngine.getDelegatedTask()) != null) {
+            r.run();
+        }
+        return sslEngine.getHandshakeStatus();
+    }
+
+    /**
+     * Performs the WRAP function
+     * @param doWrite boolean
+     * @return SSLEngineResult
+     * @throws IOException
+     */
+    protected SSLEngineResult handshakeWrap() throws IOException {
+        //this should never be called with a network buffer that contains data
+        //so we can clear it here.
+        netOutBuffer.clear();
+        //perform the wrap
+        SSLEngineResult result = sslEngine.wrap(bufHandler.getWriteBuffer(), netOutBuffer);
+        //prepare the results to be written
+        netOutBuffer.flip();
+        //set the status
+        handshakeStatus = result.getHandshakeStatus();
+        return result;
+    }
+
+    /**
+     * Perform handshake unwrap
+     * @param doread boolean
+     * @return SSLEngineResult
+     * @throws IOException
+     */
+    protected SSLEngineResult handshakeUnwrap() throws IOException {
+
+        if (netInBuffer.position() == netInBuffer.limit()) {
+            //clear the buffer if we have emptied it out on data
+            netInBuffer.clear();
+        }
+        SSLEngineResult result;
+        boolean cont = false;
+        //loop while we can perform pure SSLEngine data
+        do {
+            //prepare the buffer with the incoming data
+            netInBuffer.flip();
+            //call unwrap
+            result = sslEngine.unwrap(netInBuffer, bufHandler.getReadBuffer());
+            //compact the buffer, this is an optional method, wonder what would happen if we didn't
+            netInBuffer.compact();
+            //read in the status
+            handshakeStatus = result.getHandshakeStatus();
+            if (result.getStatus() == SSLEngineResult.Status.OK &&
+                 result.getHandshakeStatus() == HandshakeStatus.NEED_TASK) {
+                //execute tasks if we need to
+                handshakeStatus = tasks();
+            }
+            //perform another unwrap?
+            cont = result.getStatus() == SSLEngineResult.Status.OK &&
+                   handshakeStatus == HandshakeStatus.NEED_UNWRAP;
+        } while (cont);
+        return result;
+    }
+
+    /**
+     * Sends a SSL close message, will not physically close the connection here.<br>
+     * To close the connection, you could do something like
+     * <pre><code>
+     *   close();
+     *   while (isOpen() && !myTimeoutFunction()) Thread.sleep(25);
+     *   if ( isOpen() ) close(true); //forces a close if you timed out
+     * </code></pre>
+     * @throws IOException if an I/O error occurs
+     * @throws IOException if there is data on the outgoing network buffer and we are unable to flush it
+     */
+    @Override
+    public void close() throws IOException {
+        if (closing) return;
+        closing = true;
+        sslEngine.closeOutbound();
+
+        try {
+            if (!flush().get(endpoint.getSoTimeout(), TimeUnit.MILLISECONDS)) {
+                throw new IOException("Remaining data in the network buffer, can't send SSL close message, force a close with close(true) instead");
+            }
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            throw new IOException("Remaining data in the network buffer, can't send SSL close message, force a close with close(true) instead", e);
+        }
+        //prep the buffer for the close message
+        netOutBuffer.clear();
+        //perform the close, since we called sslEngine.closeOutbound
+        SSLEngineResult handshake = sslEngine.wrap(getEmptyBuf(), netOutBuffer);
+        //we should be in a close state
+        if (handshake.getStatus() != SSLEngineResult.Status.CLOSED) {
+            throw new IOException("Invalid close state, will not send network data.");
+        }
+        //prepare the buffer for writing
+        netOutBuffer.flip();
+        //if there is data to be written
+        flush();
+
+        //is the channel closed?
+        closed = (!netOutBuffer.hasRemaining() && (handshake.getHandshakeStatus() != HandshakeStatus.NEED_WRAP));
+    }
+
+    /**
+     * Force a close, can throw an IOException
+     * @param force boolean
+     * @throws IOException
+     */
+    @Override
+    public void close(boolean force) throws IOException {
+        try {
+            close();
+        } finally {
+            if ( force || closed ) {
+                closed = true;
+                sc.close();
+            }
+        }
+    }
+
+    private class FutureRead implements Future<Integer> {
+        @Override
+        public boolean cancel(boolean mayInterruptIfRunning) {
+            return false;
+        }
+        @Override
+        public boolean isCancelled() {
+            return false;
+        }
+        @Override
+        public boolean isDone() {
+            return true;
+        }
+        @Override
+        public Integer get() throws InterruptedException, ExecutionException {
+            return unwrap(netInBuffer.position());
+        }
+        @Override
+        public Integer get(long timeout, TimeUnit unit)
+                throws InterruptedException, ExecutionException,
+                TimeoutException {
+            return unwrap(netInBuffer.position());
+        }
+        protected Integer unwrap(int netread) throws ExecutionException {
+            //are we in the middle of closing or closed?
+            if (closing || closed)
+                return -1;
+            //did we reach EOF? if so send EOF up one layer.
+            if (netread == -1)
+                return -1;
+            //the data read
+            int read = 0;
+            //the SSL engine result
+            SSLEngineResult unwrap;
+            do {
+                //prepare the buffer
+                netInBuffer.flip();
+                //unwrap the data
+                try {
+                    unwrap = sslEngine.unwrap(netInBuffer, bufHandler.getReadBuffer());
+                } catch (SSLException e) {
+                    throw new ExecutionException(e);
+                }
+                //compact the buffer
+                netInBuffer.compact();
+                if (unwrap.getStatus()==Status.OK || unwrap.getStatus()==Status.BUFFER_UNDERFLOW) {
+                    //we did receive some data, add it to our total
+                    read += unwrap.bytesProduced();
+                    //perform any tasks if needed
+                    if (unwrap.getHandshakeStatus() == HandshakeStatus.NEED_TASK)
+                        tasks();
+                    //if we need more network data, then bail out for now.
+                    if (unwrap.getStatus() == Status.BUFFER_UNDERFLOW)
+                        break;
+                } else if (unwrap.getStatus()==Status.BUFFER_OVERFLOW && read > 0) {
+                    //buffer overflow can happen, if we have read data, then
+                    //empty out the dst buffer before we do another read
+                    break;
+                } else {
+                    //here we should trap BUFFER_OVERFLOW and call expand on the buffer
+                    //for now, throw an exception, as we initialized the buffers
+                    //in the constructor
+                    throw new ExecutionException(new IOException("Unable to unwrap data, invalid status: " + unwrap.getStatus()));
+                }
+            } while ((netInBuffer.position() != 0)); //continue to unwrapping as long as the input buffer has stuff
+            return (read);
+        }
+    }
+
+    private class FutureNetRead extends FutureRead {
+        private Future<Integer> integer;
+        protected FutureNetRead() {
+            this.integer = sc.read(netInBuffer);
+        }
+        @Override
+        public boolean cancel(boolean mayInterruptIfRunning) {
+            return integer.cancel(mayInterruptIfRunning);
+        }
+        @Override
+        public boolean isCancelled() {
+            return integer.isCancelled();
+        }
+        @Override
+        public boolean isDone() {
+            return integer.isDone();
+        }
+        @Override
+        public Integer get() throws InterruptedException, ExecutionException {
+            int netread = integer.get();
+            return unwrap(netread);
+        }
+        @Override
+        public Integer get(long timeout, TimeUnit unit)
+                throws InterruptedException, ExecutionException,
+                TimeoutException {
+            int netread = integer.get(timeout, unit);
+            return unwrap(netread);
+        }
+    }
+
+    /**
+     * Reads a sequence of bytes from this channel into the given buffer.
+     *
+     * @param dst The buffer into which bytes are to be transferred
+     * @return The number of bytes read, possibly zero, or <tt>-1</tt> if the channel has reached end-of-stream
+     * @throws IOException If some other I/O error occurs
+     * @throws IllegalArgumentException if the destination buffer is different than bufHandler.getReadBuffer()
+     */
+    @Override
+    public Future<Integer> read(ByteBuffer dst) {
+        //did we finish our handshake?
+        if (!handshakeComplete)
+            throw new IllegalStateException("Handshake incomplete, you must complete handshake before reading data.");
+        if (netInBuffer.position() > 0) {
+            return new FutureRead();
+        } else {
+            return new FutureNetRead();
+        }
+    }
+
+    private class FutureWrite implements Future<Integer> {
+        private Future<Integer> integer = null;
+        private int written = 0;
+        private Throwable t = null;
+        protected FutureWrite() {
+            //are we closing or closed?
+            if (closing || closed) {
+                t = new IOException("Channel is in closing state.");
+                return;
+            }
+            //The data buffer should be empty, we can reuse the entire buffer.
+            netOutBuffer.clear();
+            try {
+                SSLEngineResult result = sslEngine.wrap(bufHandler.getWriteBuffer(), netOutBuffer);
+                written = result.bytesConsumed();
+                netOutBuffer.flip();
+                if (result.getStatus() == Status.OK) {
+                    if (result.getHandshakeStatus() == HandshakeStatus.NEED_TASK)
+                        tasks();
+                } else {
+                    t = new IOException("Unable to wrap data, invalid engine state: " +result.getStatus());
+                }
+                integer = sc.write(netOutBuffer);
+            } catch (SSLException e) {
+                t = e;
+            }
+        }
+        @Override
+        public boolean cancel(boolean mayInterruptIfRunning) {
+            return integer.cancel(mayInterruptIfRunning);
+        }
+        @Override
+        public boolean isCancelled() {
+            return integer.isCancelled();
+        }
+        @Override
+        public boolean isDone() {
+            return integer.isDone();
+        }
+        @Override
+        public Integer get() throws InterruptedException, ExecutionException {
+            if (t != null) {
+                throw new ExecutionException(t);
+            }
+            integer.get();
+            return written;
+        }
+        @Override
+        public Integer get(long timeout, TimeUnit unit)
+                throws InterruptedException, ExecutionException,
+                TimeoutException {
+            if (t != null) {
+                throw new ExecutionException(t);
+            }
+            integer.get(timeout, unit);
+            return written;
+        }
+    }
+
+    /**
+     * Writes a sequence of bytes to this channel from the given buffer.
+     *
+     * @param src The buffer from which bytes are to be retrieved
+     * @return The number of bytes written, possibly zero
+     * @throws IOException If some other I/O error occurs
+     */
+    @Override
+    public Future<Integer> write(ByteBuffer src) {
+        return new FutureWrite();
+    }
+
+    private class ReadCompletionHandler<A> implements CompletionHandler<Integer, A> {
+        protected ByteBuffer dst;
+        protected CompletionHandler<Integer, ? super A> handler;
+        protected ReadCompletionHandler(ByteBuffer dst, CompletionHandler<Integer, ? super A> handler) {
+            this.dst = dst;
+            this.handler = handler;
+        }
+
+        @Override
+        public void completed(Integer nBytes, A attach) {
+            if (nBytes < 0) {
+                handler.failed(new ClosedChannelException(), attach);
+                return;
+            }
+            try {
+                //the data read
+                int read = 0;
+                //the SSL engine result
+                SSLEngineResult unwrap;
+                do {
+                    //prepare the buffer
+                    netInBuffer.flip();
+                    //unwrap the data
+                    unwrap = sslEngine.unwrap(netInBuffer, dst);
+                    //compact the buffer
+                    netInBuffer.compact();
+                    if (unwrap.getStatus() == Status.OK || unwrap.getStatus() == Status.BUFFER_UNDERFLOW) {
+                        //we did receive some data, add it to our total
+                        read += unwrap.bytesProduced();
+                        //perform any tasks if needed
+                        if (unwrap.getHandshakeStatus() == HandshakeStatus.NEED_TASK)
+                            tasks();
+                        //if we need more network data, then bail out for now.
+                        if (unwrap.getStatus() == Status.BUFFER_UNDERFLOW)
+                            break;
+                    } else if (unwrap.getStatus() == Status.BUFFER_OVERFLOW && read > 0) {
+                        //buffer overflow can happen, if we have read data, then
+                        //empty out the dst buffer before we do another read
+                        break;
+                    } else {
+                        //here we should trap BUFFER_OVERFLOW and call expand on the buffer
+                        //for now, throw an exception, as we initialized the buffers
+                        //in the constructor
+                        throw new IOException("Unable to unwrap data, invalid status: " + unwrap.getStatus());
+                    }
+                } while ((netInBuffer.position() != 0)); //continue to unwrapping as long as the input buffer has stuff
+                // If everything is OK, so complete
+                handler.completed(read, attach);
+            } catch (Exception e) {
+                // The operation must fails
+                handler.failed(e, attach);
+            }
+        }
+        @Override
+        public void failed(Throwable exc, A attach) {
+            handler.failed(exc, attach);
+        }
+    }
+
+    @Override
+    public <A> void read(final ByteBuffer dst,
+            long timeout, TimeUnit unit, final A attachment,
+            final CompletionHandler<Integer, ? super A> handler) {
+        //are we in the middle of closing or closed?
+        if (closing || closed) {
+            handler.completed(-1, attachment);
+            return;
+        }
+        //did we finish our handshake?
+        if (!handshakeComplete)
+            throw new IllegalStateException("Handshake incomplete, you must complete handshake before reading data.");
+        ReadCompletionHandler<A> readCompletionHandler = new ReadCompletionHandler<A>(dst, handler);
+        if (netInBuffer.position() > 0 ) {
+            readCompletionHandler.completed(netInBuffer.position(), attachment);
+        } else {
+            sc.read(netInBuffer, timeout, unit, attachment, readCompletionHandler);
+        }
+    }
+
+    @Override
+    public <A> void write(final ByteBuffer src, long timeout, TimeUnit unit, final A attachment,
+            final CompletionHandler<Integer, ? super A> handler) {
+        //are we closing or closed?
+        if (closing || closed) {
+            handler.failed(new IOException("Channel is in closing state."), attachment);
+            return;
+        }
+
+        try {
+            // Prepare the output buffer
+            this.netOutBuffer.clear();
+            // Wrap the source data into the internal buffer
+            SSLEngineResult result = sslEngine.wrap(bufHandler.getWriteBuffer(), netOutBuffer);
+            final int written = result.bytesConsumed();
+            netOutBuffer.flip();
+            if (result.getStatus() == Status.OK) {
+                if (result.getHandshakeStatus() == HandshakeStatus.NEED_TASK)
+                    tasks();
+            } else {
+                handler.failed(new IOException("Unable to wrap data, invalid engine state: " +result.getStatus()), attachment);
+                return;
+            }
+            // Write data to the channel
+            sc.write(this.netOutBuffer, timeout, unit, attachment,
+                    new CompletionHandler<Integer, A>() {
+                @Override
+                public void completed(Integer nBytes, A attach) {
+                    if (nBytes < 0) {
+                        handler.failed(new ClosedChannelException(), attach);
+                    } else {
+                        // Call the handler completed method with the
+                        // consumed bytes number
+                        handler.completed(written, attach);
+                    }
+                }
+                @Override
+                public void failed(Throwable exc, A attach) {
+                    handler.failed(exc, attach);
+                }
+            });
+        } catch (Throwable exp) {
+            handler.failed(exp, attachment);
+        }
+    }
+
+    private class GatherState<A> {
+        public ByteBuffer[] srcs;
+        public int offset;
+        public int length;
+        public A attachment;
+        public long timeout;
+        public TimeUnit unit;
+        public CompletionHandler<Long, ? super A> handler;
+        protected GatherState(ByteBuffer[] srcs, int offset, int length,
+                long timeout, TimeUnit unit, A attachment,
+                CompletionHandler<Long, ? super A> handler) {
+            this.srcs = srcs;
+            this.offset = offset;
+            this.length = length;
+            this.timeout = timeout;
+            this.unit = unit;
+            this.attachment = attachment;
+            this.handler = handler;
+            this.pos = offset + 1;
+        }
+        public long writeCount = 0;
+        public int pos;
+    }
+
+    private class GatherCompletionHandler<A> implements CompletionHandler<Integer, GatherState<A>> {
+        protected GatherState<A> state;
+        protected GatherCompletionHandler(GatherState<A> state) {
+            this.state = state;
+        }
+        @Override
+        public void completed(Integer nBytes, GatherState<A> attachment) {
+            if (nBytes < 0) {
+                state.handler.failed(new ClosedChannelException(), state.attachment);
+            } else {
+                if (state.pos == state.offset + state.length) {
+                    state.handler.completed(state.writeCount, state.attachment);
+                    return;
+                }
+                try {
+                    // Prepare the output buffer
+                    netOutBuffer.clear();
+                    // Wrap the source data into the internal buffer
+                    SSLEngineResult result = sslEngine.wrap(state.srcs[state.offset], netOutBuffer);
+                    state.writeCount += result.bytesConsumed();
+                    netOutBuffer.flip();
+                    if (result.getStatus() == Status.OK) {
+                        if (result.getHandshakeStatus() == HandshakeStatus.NEED_TASK)
+                            tasks();
+                    } else {
+                        failed(new IOException("Unable to wrap data, invalid engine state: " +result.getStatus()), attachment);
+                        return;
+                    }
+                    state.offset++;
+                    // Write data to the channel
+                    sc.write(netOutBuffer, state.timeout, state.unit, state, this);
+                } catch (Throwable exp) {
+                    failed(exp, attachment);
+                }
+            }
+        }
+        @Override
+        public void failed(Throwable exc, GatherState<A> attachment) {
+            state.handler.failed(exc, state.attachment);
+        }
+    }
+
+    @Override
+    public <A> void write(ByteBuffer[] srcs, int offset, int length,
+            long timeout, TimeUnit unit, A attachment,
+            CompletionHandler<Long, ? super A> handler) {
+        if ((offset < 0) || (length < 0) || (offset > srcs.length - length)) {
+            throw new IndexOutOfBoundsException();
+        }
+        //are we closing or closed?
+        if (closing || closed) {
+            handler.failed(new IOException("Channel is in closing state."), attachment);
+            return;
+        }
+        try {
+            GatherState<A> state = new GatherState<>(srcs, offset, length,
+                    timeout, unit, attachment, handler);
+            // Prepare the output buffer
+            netOutBuffer.clear();
+            // Wrap the source data into the internal buffer
+            SSLEngineResult result = sslEngine.wrap(srcs[offset], netOutBuffer);
+            state.writeCount += result.bytesConsumed();
+            netOutBuffer.flip();
+            if (result.getStatus() == Status.OK) {
+                if (result.getHandshakeStatus() == HandshakeStatus.NEED_TASK)
+                    tasks();
+            } else {
+                handler.failed(new IOException("Unable to wrap data, invalid engine state: " +result.getStatus()), attachment);
+                return;
+            }
+            // Write data to the channel
+            sc.write(netOutBuffer, timeout, unit, state, new GatherCompletionHandler<A>(state));
+        } catch (Throwable exp) {
+            handler.failed(exp, attachment);
+        }
+   }
+
+    /**
+     * Callback interface to be able to expand buffers
+     * when buffer overflow exceptions happen
+     */
+    public static interface ApplicationBufferHandler {
+        public ByteBuffer expand(ByteBuffer buffer, int remaining);
+        public ByteBuffer getReadBuffer();
+        public ByteBuffer getWriteBuffer();
+    }
+
+    @Override
+    public ApplicationBufferHandler getBufHandler() {
+        return bufHandler;
+    }
+
+    @Override
+    public boolean isHandshakeComplete() {
+        return handshakeComplete;
+    }
+
+    @Override
+    public boolean isClosing() {
+        return closing;
+    }
+
+    public SSLEngine getSslEngine() {
+        return sslEngine;
+    }
+
+    public ByteBuffer getEmptyBuf() {
+        return emptyBuf;
+    }
+
+    public void setBufHandler(ApplicationBufferHandler bufHandler) {
+        this.bufHandler = bufHandler;
+    }
+
+    @Override
+    public AsynchronousSocketChannel getIOChannel() {
+        return sc;
+    }
+
+}
\ No newline at end of file

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/SocketProperties.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/SocketProperties.java?rev=1575905&r1=1575904&r2=1575905&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/SocketProperties.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/SocketProperties.java Mon Mar 10 11:27:11 2014
@@ -16,9 +16,13 @@
  */
 package org.apache.tomcat.util.net;
 
+import java.io.IOException;
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketException;
+import java.net.StandardSocketOptions;
+import java.nio.channels.AsynchronousServerSocketChannel;
+import java.nio.channels.AsynchronousSocketChannel;
 
 /**
  * Properties that can be set in the &lt;Connector&gt; element
@@ -27,13 +31,13 @@ import java.net.SocketException;
  */
 public class SocketProperties {
     /**
-     * Enable/disable key cache, this bounded cache stores
-     * KeyAttachment objects to reduce GC
+     * Enable/disable socket wrapper cache, this bounded cache stores
+     * SocketWrapper objects to reduce GC
      * Default is 500
      * -1 is unlimited
      * 0 is disabled
      */
-    protected int keyCache = 500;
+    protected int socketWrapperCache = 500;
 
     /**
      * Enable/disable socket processor cache, this bounded cache stores
@@ -212,6 +216,27 @@ public class SocketProperties {
             socket.setSoTimeout(soTimeout.intValue());
     }
 
+    public void setProperties(AsynchronousSocketChannel socket) throws IOException {
+        if (rxBufSize != null)
+            socket.setOption(StandardSocketOptions.SO_RCVBUF, rxBufSize.intValue());
+        if (txBufSize != null)
+            socket.setOption(StandardSocketOptions.SO_SNDBUF, txBufSize.intValue());
+        if (soKeepAlive != null)
+            socket.setOption(StandardSocketOptions.SO_KEEPALIVE, soKeepAlive.booleanValue());
+        if (soReuseAddress != null)
+            socket.setOption(StandardSocketOptions.SO_REUSEADDR, soReuseAddress.booleanValue());
+        if (soLingerOn != null && soLingerOn.booleanValue() && soLingerTime != null)
+            socket.setOption(StandardSocketOptions.SO_LINGER, soLingerTime.intValue());
+        if (tcpNoDelay != null)
+            socket.setOption(StandardSocketOptions.TCP_NODELAY, tcpNoDelay.booleanValue());
+    }
+
+    public void setProperties(AsynchronousServerSocketChannel socket) throws IOException {
+        if (rxBufSize != null)
+            socket.setOption(StandardSocketOptions.SO_RCVBUF, rxBufSize.intValue());
+        if (soReuseAddress != null)
+            socket.setOption(StandardSocketOptions.SO_REUSEADDR, soReuseAddress.booleanValue());
+    }
 
     public boolean getDirectBuffer() {
         return directBuffer;
@@ -278,7 +303,11 @@ public class SocketProperties {
     }
 
     public int getKeyCache() {
-        return keyCache;
+        return socketWrapperCache;
+    }
+
+    public int getSocketWrapperCache() {
+        return socketWrapperCache;
     }
 
     public int getAppReadBufSize() {
@@ -366,8 +395,12 @@ public class SocketProperties {
         this.eventCache = eventCache;
     }
 
+    public void setSocketWrapperCache(int socketWrapperCache) {
+        this.socketWrapperCache = socketWrapperCache;
+    }
+
     public void setKeyCache(int keyCache) {
-        this.keyCache = keyCache;
+        this.socketWrapperCache = keyCache;
     }
 
     public void setAppReadBufSize(int appReadBufSize) {

Modified: tomcat/trunk/webapps/docs/changelog.xml
URL: http://svn.apache.org/viewvc/tomcat/trunk/webapps/docs/changelog.xml?rev=1575905&r1=1575904&r2=1575905&view=diff
==============================================================================
--- tomcat/trunk/webapps/docs/changelog.xml (original)
+++ tomcat/trunk/webapps/docs/changelog.xml Mon Mar 10 11:27:11 2014
@@ -108,6 +108,10 @@
         NIO connector and a request is sent using more than one AJP message.
         Patch provided by Amund Elstad. (markt)
       </fix>
+      <add>
+        Adds experimental NIO2 connector. Based on code developed by
+        Nabil Benothman. (remm)
+      </add>
     </changelog>
   </subsection>
   <subsection name="Jasper">



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org