You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by re...@apache.org on 2014/03/10 12:27:12 UTC
svn commit: r1575905 [3/4] - in /tomcat/trunk: ./
java/org/apache/coyote/ajp/ java/org/apache/coyote/http11/
java/org/apache/coyote/http11/upgrade/ java/org/apache/tomcat/util/net/
webapps/docs/ webapps/docs/config/
Added: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java?rev=1575905&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java (added)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Mon Mar 10 11:27:11 2014
@@ -0,0 +1,1251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tomcat.util.net;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.StandardSocketOptions;
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousChannelGroup;
+import java.nio.channels.AsynchronousServerSocketChannel;
+import java.nio.channels.AsynchronousSocketChannel;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.CompletionHandler;
+import java.nio.channels.FileChannel;
+import java.nio.channels.ReadPendingException;
+import java.nio.file.StandardOpenOption;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLSessionContext;
+import javax.net.ssl.X509KeyManager;
+
+import org.apache.juli.logging.Log;
+import org.apache.juli.logging.LogFactory;
+import org.apache.tomcat.util.ExceptionUtils;
+import org.apache.tomcat.util.collections.SynchronizedStack;
+import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
+import org.apache.tomcat.util.net.SecureNio2Channel.ApplicationBufferHandler;
+import org.apache.tomcat.util.net.jsse.NioX509KeyManager;
+
+/**
+ * NIO2 endpoint.
+ */
+public class Nio2Endpoint extends AbstractEndpoint<Nio2Channel> {
+
+
+ // -------------------------------------------------------------- Constants
+
+
+ private static final Log log = LogFactory.getLog(Nio2Endpoint.class);
+
+
+ public static final int OP_REGISTER = 0x100; //register interest op
+ public static final int OP_CALLBACK = 0x200; //callback interest op
+ public static final int OP_READ = 0x400; //read interest op
+ public static final int OP_WRITE = 0x800; //write interest op
+
+ // ----------------------------------------------------------------- Fields
+
+ /**
+ * Server socket "pointer".
+ */
+ private AsynchronousServerSocketChannel serverSock = null;
+
+ /**
+ * use send file
+ */
+ private boolean useSendfile = true;
+
+ /**
+ * The size of the OOM parachute.
+ */
+ private int oomParachute = 1024*1024;
+
+ /**
+ * Allows detecting if a completion handler completes inline.
+ */
+ private static ThreadLocal<Boolean> inlineCompletion = new ThreadLocal<>();
+
+ /**
+ * The oom parachute, when an OOM error happens,
+ * will release the data, giving the JVM instantly
+ * a chunk of data to be able to recover with.
+ */
+ private byte[] oomParachuteData = null;
+
+ /**
+ * Make sure this string has already been allocated
+ */
+ private static final String oomParachuteMsg =
+ "SEVERE:Memory usage is low, parachute is non existent, your system may start failing.";
+
+ /**
+ * Keep track of OOM warning messages.
+ */
+ private long lastParachuteCheck = System.currentTimeMillis();
+
+ /**
+ * Cache for SocketProcessor objects
+ */
+ private final SynchronizedStack<SocketProcessor> processorCache =
+ new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
+ socketProperties.getProcessorCache());
+
+ /**
+ * Cache for key attachment objects
+ */
+ private final SynchronizedStack<Nio2SocketWrapper> socketWrapperCache =
+ new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
+ socketProperties.getSocketWrapperCache());
+
+ /**
+ * Bytebuffer cache, each channel holds a set of buffers (two, except for SSL holds four)
+ */
+ private final SynchronizedStack<Nio2Channel> nioChannels =
+ new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
+ socketProperties.getBufferPoolSize());
+
+
+ // ------------------------------------------------------------- Properties
+
+
+ /**
+ * Use the object caches to reduce GC at the expense of additional memory use.
+ */
+ private boolean useCaches = false;
+ public void setUseCaches(boolean useCaches) { this.useCaches = useCaches; }
+ public boolean getUseCaches() { return useCaches; }
+
+
+ /**
+ * Priority of the poller threads.
+ */
+ private int pollerThreadPriority = Thread.NORM_PRIORITY;
+ public void setPollerThreadPriority(int pollerThreadPriority) { this.pollerThreadPriority = pollerThreadPriority; }
+ public int getPollerThreadPriority() { return pollerThreadPriority; }
+
+
+ /**
+ * Handling of accepted sockets.
+ */
+ private Handler handler = null;
+ public void setHandler(Handler handler ) { this.handler = handler; }
+ public Handler getHandler() { return handler; }
+
+
+ /**
+ * Allow comet request handling.
+ */
+ private boolean useComet = true;
+ public void setUseComet(boolean useComet) { this.useComet = useComet; }
+ @Override
+ public boolean getUseComet() { return useComet; }
+ @Override
+ public boolean getUseCometTimeout() { return getUseComet(); }
+ @Override
+ public boolean getUsePolling() { return true; } // Always supported
+
+ public void setSocketProperties(SocketProperties socketProperties) {
+ this.socketProperties = socketProperties;
+ }
+
+ public void setUseSendfile(boolean useSendfile) {
+ this.useSendfile = useSendfile;
+ }
+
+ /**
+ * Is deferAccept supported?
+ */
+ @Override
+ public boolean getDeferAccept() {
+ // Not supported
+ return false;
+ }
+
+ public void setOomParachute(int oomParachute) {
+ this.oomParachute = oomParachute;
+ }
+
+ public void setOomParachuteData(byte[] oomParachuteData) {
+ this.oomParachuteData = oomParachuteData;
+ }
+
+
+ private SSLContext sslContext = null;
+ public SSLContext getSSLContext() { return sslContext;}
+ public void setSSLContext(SSLContext c) { sslContext = c;}
+ private String[] enabledCiphers;
+ private String[] enabledProtocols;
+
+ /**
+ * Port in use.
+ */
+ @Override
+ public int getLocalPort() {
+ AsynchronousServerSocketChannel ssc = serverSock;
+ if (ssc == null) {
+ return -1;
+ } else {
+ try {
+ SocketAddress sa = ssc.getLocalAddress();
+ if (sa != null && sa instanceof InetSocketAddress) {
+ return ((InetSocketAddress) sa).getPort();
+ } else {
+ return -1;
+ }
+ } catch (IOException e) {
+ return -1;
+ }
+ }
+ }
+
+
+ @Override
+ public String[] getCiphersUsed() {
+ return enabledCiphers;
+ }
+
+
+ // --------------------------------------------------------- OOM Parachute Methods
+
+ protected void checkParachute() {
+ boolean para = reclaimParachute(false);
+ if (!para && (System.currentTimeMillis()-lastParachuteCheck)>10000) {
+ try {
+ log.fatal(oomParachuteMsg);
+ }catch (Throwable t) {
+ ExceptionUtils.handleThrowable(t);
+ System.err.println(oomParachuteMsg);
+ }
+ lastParachuteCheck = System.currentTimeMillis();
+ }
+ }
+
+ protected boolean reclaimParachute(boolean force) {
+ if ( oomParachuteData != null ) return true;
+ if ( oomParachute > 0 && ( force || (Runtime.getRuntime().freeMemory() > (oomParachute*2))) )
+ oomParachuteData = new byte[oomParachute];
+ return oomParachuteData != null;
+ }
+
+ protected void releaseCaches() {
+ if (useCaches) {
+ this.socketWrapperCache.clear();
+ this.nioChannels.clear();
+ this.processorCache.clear();
+ }
+ if ( handler != null ) handler.recycle();
+
+ }
+
+ // --------------------------------------------------------- Public Methods
+
+ /**
+ * Number of keepalive sockets.
+ */
+ public int getKeepAliveCount() {
+ return 0;
+ // FIXME: would need some specific statistics gathering
+ }
+
+
+ // ----------------------------------------------- Public Lifecycle Methods
+
+
+ /**
+ * Initialize the endpoint.
+ */
+ @Override
+ public void bind() throws Exception {
+
+ // Create worker collection
+ if ( getExecutor() == null ) {
+ createExecutor();
+ }
+ AsynchronousChannelGroup threadGroup = null;
+ if (getExecutor() instanceof ExecutorService) {
+ threadGroup = AsynchronousChannelGroup.withThreadPool((ExecutorService) getExecutor());
+ }
+
+ serverSock = AsynchronousServerSocketChannel.open(threadGroup);
+ socketProperties.setProperties(serverSock);
+ InetSocketAddress addr = (getAddress()!=null?new InetSocketAddress(getAddress(),getPort()):new InetSocketAddress(getPort()));
+ serverSock.bind(addr,getBacklog());
+
+ // Initialize thread count defaults for acceptor, poller
+ if (acceptorThreadCount == 0) {
+ // NIO2 does not allow any form of IO concurrency
+ acceptorThreadCount = 1;
+ }
+
+ // Initialize SSL if needed
+ if (isSSLEnabled()) {
+ SSLUtil sslUtil = handler.getSslImplementation().getSSLUtil(this);
+
+ sslContext = sslUtil.createSSLContext();
+ sslContext.init(wrap(sslUtil.getKeyManagers()),
+ sslUtil.getTrustManagers(), null);
+
+ SSLSessionContext sessionContext =
+ sslContext.getServerSessionContext();
+ if (sessionContext != null) {
+ sslUtil.configureSessionContext(sessionContext);
+ }
+ // Determine which cipher suites and protocols to enable
+ enabledCiphers = sslUtil.getEnableableCiphers(sslContext);
+ enabledProtocols = sslUtil.getEnableableProtocols(sslContext);
+ }
+
+ if (oomParachute>0) reclaimParachute(true);
+ }
+
+ public KeyManager[] wrap(KeyManager[] managers) {
+ if (managers==null) return null;
+ KeyManager[] result = new KeyManager[managers.length];
+ for (int i=0; i<result.length; i++) {
+ if (managers[i] instanceof X509KeyManager && getKeyAlias()!=null) {
+ result[i] = new NioX509KeyManager((X509KeyManager)managers[i],getKeyAlias());
+ } else {
+ result[i] = managers[i];
+ }
+ }
+ return result;
+ }
+
+
+ /**
+ * Start the NIO endpoint, creating acceptor, poller threads.
+ */
+ @Override
+ public void startInternal() throws Exception {
+
+ if (!running) {
+ running = true;
+ paused = false;
+
+ // FIXME: remove when more stable
+ log.warn("The NIO2 connector is currently EXPERIMENTAL and should not be used in production");
+
+ // Create worker collection
+ if ( getExecutor() == null ) {
+ createExecutor();
+ }
+
+ initializeConnectionLatch();
+ startAcceptorThreads();
+
+ // Start async timeout thread
+ Thread timeoutThread = new Thread(new AsyncTimeout(),
+ getName() + "-AsyncTimeout");
+ timeoutThread.setPriority(threadPriority);
+ timeoutThread.setDaemon(true);
+ timeoutThread.start();
+ }
+ }
+
+
+ /**
+ * Stop the endpoint. This will cause all processing threads to stop.
+ */
+ @Override
+ public void stopInternal() {
+ releaseConnectionLatch();
+ if (!paused) {
+ pause();
+ }
+ if (running) {
+ running = false;
+ unlockAccept();
+ }
+ if (useCaches) {
+ socketWrapperCache.clear();
+ nioChannels.clear();
+ processorCache.clear();
+ }
+ }
+
+
+ /**
+ * Deallocate NIO memory pools, and close server socket.
+ */
+ @Override
+ public void unbind() throws Exception {
+ if (log.isDebugEnabled()) {
+ log.debug("Destroy initiated for "+new InetSocketAddress(getAddress(),getPort()));
+ }
+ if (running) {
+ stop();
+ }
+ // Close server socket
+ serverSock.close();
+ serverSock = null;
+ sslContext = null;
+ // Unlike other connectors, the thread pool is tied to the server socket
+ shutdownExecutor();
+ releaseCaches();
+ if (log.isDebugEnabled()) {
+ log.debug("Destroy completed for "+new InetSocketAddress(getAddress(),getPort()));
+ }
+ }
+
+
+ // ------------------------------------------------------ Protected Methods
+
+
+ public int getWriteBufSize() {
+ return socketProperties.getTxBufSize();
+ }
+
+ public int getReadBufSize() {
+ return socketProperties.getRxBufSize();
+ }
+
+ @Override
+ public boolean getUseSendfile() {
+ return useSendfile;
+ }
+
+ public int getOomParachute() {
+ return oomParachute;
+ }
+
+ public byte[] getOomParachuteData() {
+ return oomParachuteData;
+ }
+
+
+ @Override
+ protected AbstractEndpoint.Acceptor createAcceptor() {
+ return new Acceptor();
+ }
+
+
+ /**
+ * Process the specified connection.
+ */
+ protected boolean setSocketOptions(AsynchronousSocketChannel socket) {
+ // Process the connection
+ try {
+ socketProperties.setProperties(socket);
+
+ Nio2Channel channel = (useCaches) ? nioChannels.pop() : null;
+ if (channel == null) {
+ // SSL setup
+ if (sslContext != null) {
+ SSLEngine engine = createSSLEngine();
+ int appbufsize = engine.getSession().getApplicationBufferSize();
+ NioBufferHandler bufhandler = new NioBufferHandler(Math.max(appbufsize,socketProperties.getAppReadBufSize()),
+ socketProperties.getAppWriteBufSize(),
+ socketProperties.getDirectBuffer());
+ channel = new SecureNio2Channel(socket, engine, bufhandler, this);
+ } else {
+ // normal tcp setup
+ NioBufferHandler bufhandler = new NioBufferHandler(socketProperties.getAppReadBufSize(),
+ socketProperties.getAppWriteBufSize(),
+ socketProperties.getDirectBuffer());
+
+ channel = new Nio2Channel(socket, bufhandler);
+ }
+ } else {
+ channel.setIOChannel(socket);
+ if ( channel instanceof SecureNio2Channel ) {
+ SSLEngine engine = createSSLEngine();
+ ((SecureNio2Channel)channel).reset(engine);
+ } else {
+ channel.reset();
+ }
+ }
+ Nio2SocketWrapper socketWrapper = (useCaches) ? socketWrapperCache.pop() : null;
+ if (socketWrapper == null) {
+ socketWrapper = new Nio2SocketWrapper(channel);
+ }
+ socketWrapper.reset(channel, getSocketProperties().getSoTimeout());
+ socketWrapper.setKeepAliveLeft(Nio2Endpoint.this.getMaxKeepAliveRequests());
+ socketWrapper.setSecure(isSSLEnabled());
+ if (sslContext != null) {
+ ((SecureNio2Channel) channel).setSocket(socketWrapper);
+ }
+ processSocket(socketWrapper, SocketStatus.OPEN_READ, true);
+ // FIXME: In theory, awaitBytes is better, but the SSL handshake is done by processSocket
+ //awaitBytes(socketWrapper);
+ } catch (Throwable t) {
+ ExceptionUtils.handleThrowable(t);
+ try {
+ log.error("",t);
+ } catch (Throwable tt) {
+ ExceptionUtils.handleThrowable(t);
+ }
+ // Tell to close the socket
+ return false;
+ }
+ return true;
+ }
+
+ protected SSLEngine createSSLEngine() {
+ SSLEngine engine = sslContext.createSSLEngine();
+ if ("false".equals(getClientAuth())) {
+ engine.setNeedClientAuth(false);
+ engine.setWantClientAuth(false);
+ } else if ("true".equals(getClientAuth()) || "yes".equals(getClientAuth())){
+ engine.setNeedClientAuth(true);
+ } else if ("want".equals(getClientAuth())) {
+ engine.setWantClientAuth(true);
+ }
+ engine.setUseClientMode(false);
+ engine.setEnabledCipherSuites(enabledCiphers);
+ engine.setEnabledProtocols(enabledProtocols);
+
+ handler.onCreateSSLEngine(engine);
+ return engine;
+ }
+
+
+ /**
+ * Returns true if a worker thread is available for processing.
+ * @return boolean
+ */
+ protected boolean isWorkerAvailable() {
+ return true;
+ }
+
+
+ @Override
+ public void processSocket(SocketWrapper<Nio2Channel> socketWrapper,
+ SocketStatus socketStatus, boolean dispatch) {
+ processSocket0(socketWrapper, socketStatus, dispatch);
+ }
+
+ protected boolean processSocket0(SocketWrapper<Nio2Channel> socket, SocketStatus status, boolean dispatch) {
+ try {
+ ((Nio2SocketWrapper) socket).setCometNotify(false); //will get reset upon next reg
+ SocketProcessor sc = (useCaches) ? processorCache.pop() : null;
+ if (sc == null) {
+ sc = new SocketProcessor(socket, status);
+ } else {
+ sc.reset(socket, status);
+ }
+ Executor executor = getExecutor();
+ if (dispatch && executor != null) {
+ executor.execute(sc);
+ } else {
+ sc.run();
+ }
+ } catch (RejectedExecutionException ree) {
+ log.warn(sm.getString("endpoint.executor.fail", socket), ree);
+ return false;
+ } catch (Throwable t) {
+ ExceptionUtils.handleThrowable(t);
+ // This means we got an OOM or similar creating a thread, or that
+ // the pool and its queue are full
+ log.error(sm.getString("endpoint.process.fail"), t);
+ return false;
+ }
+ return true;
+ }
+
+ public void closeSocket(SocketWrapper<Nio2Channel> socket, SocketStatus status) {
+ try {
+ Nio2SocketWrapper ka = (Nio2SocketWrapper) socket;
+ if (socket != null && socket.isComet() && status != null) {
+ socket.setComet(false);//to avoid a loop
+ if (status == SocketStatus.TIMEOUT ) {
+ if (processSocket0(socket, status, true)) {
+ return; // don't close on comet timeout
+ }
+ } else {
+ // Don't dispatch if the lines below are canceling the key
+ processSocket0(socket, status, false);
+ }
+ }
+ if (socket!=null) handler.release(socket);
+ try {
+ if (socket!=null) {
+ socket.getSocket().close(true);
+ }
+ } catch (Exception e){
+ if (log.isDebugEnabled()) {
+ log.debug(sm.getString(
+ "endpoint.debug.socketCloseFail"), e);
+ }
+ }
+ try {
+ if (ka != null && ka.getSendfileData() != null
+ && ka.getSendfileData().fchannel != null
+ && ka.getSendfileData().fchannel.isOpen()) {
+ ka.getSendfileData().fchannel.close();
+ }
+ } catch (Exception ignore) {
+ }
+ if (ka!=null) {
+ ka.reset();
+ countDownConnection();
+ }
+ } catch (Throwable e) {
+ ExceptionUtils.handleThrowable(e);
+ if (log.isDebugEnabled()) log.error("",e);
+ }
+ }
+
+ @Override
+ protected Log getLog() {
+ return log;
+ }
+
+
+ // --------------------------------------------------- Acceptor Inner Class
+
+ /**
+ * With NIO2, the main acceptor thread only initiates the initial accept
+ * but periodically checks that the connector is still accepting (if not
+ * it will attempt to start again). It is also responsible for periodic
+ * checks of async timeouts, rather than use a dedicated thread for that.
+ */
+ protected class Acceptor extends AbstractEndpoint.Acceptor {
+
+ @Override
+ public void run() {
+
+ int errorDelay = 0;
+
+ // Loop until we receive a shutdown command
+ while (running) {
+
+ // Loop if endpoint is paused
+ while (paused && running) {
+ state = AcceptorState.PAUSED;
+ try {
+ Thread.sleep(50);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+
+ if (!running) {
+ break;
+ }
+ state = AcceptorState.RUNNING;
+
+ try {
+ //if we have reached max connections, wait
+ countUpOrAwaitConnection();
+
+ AsynchronousSocketChannel socket = null;
+ try {
+ // Accept the next incoming connection from the server
+ // socket
+ socket = serverSock.accept().get();
+ } catch (Exception e) {
+ countDownConnection();
+ // Introduce delay if necessary
+ errorDelay = handleExceptionWithDelay(errorDelay);
+ // re-throw
+ throw e;
+ }
+ // Successful accept, reset the error delay
+ errorDelay = 0;
+
+ // Configure the socket
+ if (running && !paused) {
+ // Hand this socket off to an appropriate processor
+ if (!setSocketOptions(socket)) {
+ countDownConnection();
+ closeSocket(socket);
+ }
+ } else {
+ countDownConnection();
+ // Close socket right away
+ closeSocket(socket);
+ }
+ } catch (NullPointerException npe) {
+ if (running) {
+ log.error(sm.getString("endpoint.accept.fail"), npe);
+ }
+ } catch (Throwable t) {
+ ExceptionUtils.handleThrowable(t);
+ log.error(sm.getString("endpoint.accept.fail"), t);
+ }
+ }
+ state = AcceptorState.ENDED;
+ }
+
+ }
+
+ /**
+ * Async timeout thread
+ */
+ protected class AsyncTimeout implements Runnable {
+ /**
+ * The background thread that checks async requests and fires the
+ * timeout if there has been no activity.
+ */
+ @Override
+ public void run() {
+
+ // Loop until we receive a shutdown command
+ while (running) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ long now = System.currentTimeMillis();
+ Iterator<SocketWrapper<Nio2Channel>> sockets =
+ waitingRequests.keySet().iterator();
+ while (sockets.hasNext()) {
+ SocketWrapper<Nio2Channel> socket = sockets.next();
+ long access = socket.getLastAccess();
+ if (socket.getTimeout() > 0 &&
+ (now-access)>socket.getTimeout()) {
+ processSocket(socket, SocketStatus.TIMEOUT, true);
+ }
+ }
+
+ // Loop if endpoint is paused
+ while (paused && running) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+
+ }
+ }
+ }
+
+
+ private void closeSocket(AsynchronousSocketChannel socket) {
+ try {
+ socket.close();
+ } catch (IOException ioe) {
+ if (log.isDebugEnabled()) {
+ log.debug("", ioe);
+ }
+ }
+ }
+
+
+ public static class Nio2SocketWrapper extends SocketWrapper<Nio2Channel> {
+
+ public Nio2SocketWrapper(Nio2Channel channel) {
+ super(channel);
+ }
+
+ public void reset(Nio2Channel channel, long soTimeout) {
+ super.reset(channel, soTimeout);
+ upgradeInit = false;
+ cometNotify = false;
+ interestOps = 0;
+ sendfileData = null;
+ if (readLatch != null) {
+ try {
+ for (int i = 0; i < (int) readLatch.getCount(); i++) {
+ readLatch.countDown();
+ }
+ } catch (Exception ignore) {
+ }
+ }
+ readLatch = null;
+ sendfileData = null;
+ if (writeLatch != null) {
+ try {
+ for (int i = 0; i < (int) writeLatch.getCount(); i++) {
+ writeLatch.countDown();
+ }
+ } catch (Exception ignore) {
+ }
+ }
+ writeLatch = null;
+ setWriteTimeout(soTimeout);
+ }
+
+ public void reset() {
+ reset(null, -1);
+ }
+
+ @Override
+ public long getTimeout() {
+ long timeout = super.getTimeout();
+ return (timeout > 0) ? timeout : Long.MAX_VALUE;
+ }
+ public void setUpgraded(boolean upgraded) {
+ if (upgraded && !isUpgraded()) {
+ upgradeInit = true;
+ }
+ super.setUpgraded(upgraded);
+ }
+ public boolean isUpgradeInit() {
+ boolean value = upgradeInit;
+ upgradeInit = false;
+ return value;
+ }
+ public void setCometNotify(boolean notify) { this.cometNotify = notify; }
+ public boolean getCometNotify() { return cometNotify; }
+ public Nio2Channel getChannel() { return getSocket();}
+ public int interestOps() { return interestOps;}
+ public int interestOps(int ops) { this.interestOps = ops; return ops; }
+ public CountDownLatch getReadLatch() { return readLatch; }
+ public CountDownLatch getWriteLatch() { return writeLatch; }
+ protected CountDownLatch resetLatch(CountDownLatch latch) {
+ if ( latch==null || latch.getCount() == 0 ) return null;
+ else throw new IllegalStateException("Latch must be at count 0");
+ }
+ public void resetReadLatch() { readLatch = resetLatch(readLatch); }
+ public void resetWriteLatch() { writeLatch = resetLatch(writeLatch); }
+
+ protected CountDownLatch startLatch(CountDownLatch latch, int cnt) {
+ if ( latch == null || latch.getCount() == 0 ) {
+ return new CountDownLatch(cnt);
+ }
+ else throw new IllegalStateException("Latch must be at count 0 or null.");
+ }
+ public void startReadLatch(int cnt) { readLatch = startLatch(readLatch,cnt);}
+ public void startWriteLatch(int cnt) { writeLatch = startLatch(writeLatch,cnt);}
+
+ protected void awaitLatch(CountDownLatch latch, long timeout, TimeUnit unit) throws InterruptedException {
+ if ( latch == null ) throw new IllegalStateException("Latch cannot be null");
+ // Note: While the return value is ignored if the latch does time
+ // out, logic further up the call stack will trigger a
+ // SocketTimeoutException
+ latch.await(timeout,unit);
+ }
+ public void awaitReadLatch(long timeout, TimeUnit unit) throws InterruptedException { awaitLatch(readLatch,timeout,unit);}
+ public void awaitWriteLatch(long timeout, TimeUnit unit) throws InterruptedException { awaitLatch(writeLatch,timeout,unit);}
+
+ public void setSendfileData(SendfileData sf) { this.sendfileData = sf;}
+ public SendfileData getSendfileData() { return this.sendfileData;}
+
+ public void setWriteTimeout(long writeTimeout) {
+ if (writeTimeout <= 0) {
+ this.writeTimeout = Long.MAX_VALUE;
+ } else {
+ this.writeTimeout = writeTimeout;
+ }
+ }
+ public long getWriteTimeout() {return this.writeTimeout;}
+
+ private int interestOps = 0;
+ private boolean cometNotify = false;
+ private CountDownLatch readLatch = null;
+ private CountDownLatch writeLatch = null;
+ private SendfileData sendfileData = null;
+ private long writeTimeout = -1;
+ private boolean upgradeInit = false;
+
+ }
+
+ // ------------------------------------------------ Application Buffer Handler
+ public static class NioBufferHandler implements ApplicationBufferHandler {
+ private ByteBuffer readbuf = null;
+ private ByteBuffer writebuf = null;
+
+ public NioBufferHandler(int readsize, int writesize, boolean direct) {
+ if ( direct ) {
+ readbuf = ByteBuffer.allocateDirect(readsize);
+ writebuf = ByteBuffer.allocateDirect(writesize);
+ }else {
+ readbuf = ByteBuffer.allocate(readsize);
+ writebuf = ByteBuffer.allocate(writesize);
+ }
+ }
+
+ @Override
+ public ByteBuffer expand(ByteBuffer buffer, int remaining) {return buffer;}
+ @Override
+ public ByteBuffer getReadBuffer() {return readbuf;}
+ @Override
+ public ByteBuffer getWriteBuffer() {return writebuf;}
+
+ }
+
+ // ------------------------------------------------ Handler Inner Interface
+
+
+ /**
+ * Bare bones interface used for socket processing. Per thread data is to be
+ * stored in the ThreadWithAttributes extra folders, or alternately in
+ * thread local fields.
+ */
+ public interface Handler extends AbstractEndpoint.Handler {
+ public SocketState process(SocketWrapper<Nio2Channel> socket,
+ SocketStatus status);
+ public void release(SocketWrapper<Nio2Channel> socket);
+ public SSLImplementation getSslImplementation();
+ public void onCreateSSLEngine(SSLEngine engine);
+ }
+
+ protected ConcurrentHashMap<SocketWrapper<Nio2Channel>, SocketWrapper<Nio2Channel>> waitingRequests =
+ new ConcurrentHashMap<>();
+
+ /**
+ * The completion handler used for asynchronous read operations
+ */
+ private CompletionHandler<Integer, SocketWrapper<Nio2Channel>> awaitBytes
+ = new CompletionHandler<Integer, SocketWrapper<Nio2Channel>>() {
+
+ @Override
+ public synchronized void completed(Integer nBytes, SocketWrapper<Nio2Channel> attachment) {
+ if (nBytes < 0) {
+ failed(new ClosedChannelException(), attachment);
+ return;
+ }
+ processSocket0(attachment, SocketStatus.OPEN_READ, true);
+ }
+
+ @Override
+ public void failed(Throwable exc, SocketWrapper<Nio2Channel> attachment) {
+ processSocket0(attachment, SocketStatus.DISCONNECT, true);
+ }
+ };
+
+ public void addTimeout(SocketWrapper<Nio2Channel> socket) {
+ waitingRequests.put(socket, socket);
+ }
+
+ public boolean removeTimeout(SocketWrapper<Nio2Channel> socket) {
+ return waitingRequests.remove(socket) != null;
+ }
+
+ public static void startInline() {
+ inlineCompletion.set(Boolean.TRUE);
+ }
+
+ public static void endInline() {
+ inlineCompletion.set(Boolean.FALSE);
+ }
+
+ public static boolean isInline() {
+ Boolean flag = inlineCompletion.get();
+ if (flag == null) {
+ return false;
+ } else {
+ return flag.booleanValue();
+ }
+ }
+
+ public void awaitBytes(SocketWrapper<Nio2Channel> socket) {
+ if (socket == null || socket.getSocket() == null) {
+ return;
+ }
+ ByteBuffer byteBuffer = socket.getSocket().getBufHandler().getReadBuffer();
+ byteBuffer.clear();
+ try {
+ socket.getSocket().read(byteBuffer, socket.getTimeout(),
+ TimeUnit.MILLISECONDS, socket, awaitBytes);
+ } catch (ReadPendingException e) {
+ // Ignore
+ }
+ }
+
+ public boolean processSendfile(final Nio2SocketWrapper socket) {
+
+ // Configure the send file data
+ SendfileData data = socket.getSendfileData();
+ if (data.fchannel == null || !data.fchannel.isOpen()) {
+ java.nio.file.Path path = new File(data.fileName).toPath();
+ try {
+ data.fchannel = java.nio.channels.FileChannel
+ .open(path, StandardOpenOption.READ).position(data.pos);
+ } catch (IOException e) {
+ closeSocket(socket, SocketStatus.ERROR);
+ return false;
+ }
+ }
+
+ final ByteBuffer buffer;
+ if (!socketProperties.getDirectBuffer() && sslContext == null) {
+ // If not using SSL and direct buffers are not used, the
+ // idea of sendfile is to avoid memory copies, so allocate a
+ // direct buffer
+ int BUFFER_SIZE;
+ try {
+ BUFFER_SIZE = socket.getSocket().getIOChannel().getOption(StandardSocketOptions.SO_SNDBUF);
+ } catch (IOException e) {
+ BUFFER_SIZE = 8192;
+ }
+ buffer = ByteBuffer.allocateDirect(BUFFER_SIZE);
+ } else {
+ buffer = socket.getSocket().getBufHandler().getWriteBuffer();
+ }
+ int nr = -1;
+ try {
+ nr = data.fchannel.read(buffer);
+ } catch (IOException e1) {
+ closeSocket(socket, SocketStatus.ERROR);
+ return false;
+ }
+
+ if (nr >= 0) {
+ socket.getSocket().setSendFile(true);
+ buffer.flip();
+ socket.getSocket().write(buffer, data, new CompletionHandler<Integer, SendfileData>() {
+
+ @Override
+ public void completed(Integer nw, SendfileData attachment) {
+ if (nw < 0) { // Reach the end of stream
+ closeSocket(socket, SocketStatus.DISCONNECT);
+ try {
+ attachment.fchannel.close();
+ } catch (IOException e) {
+ // Ignore
+ }
+ return;
+ }
+
+ attachment.pos += nw;
+ attachment.length -= nw;
+
+ if (attachment.length <= 0) {
+ socket.setSendfileData(null);
+ try {
+ attachment.fchannel.close();
+ } catch (IOException e) {
+ // Ignore
+ }
+ if (attachment.keepAlive) {
+ socket.getSocket().setSendFile(false);
+ awaitBytes(socket);
+ } else {
+ closeSocket(socket, SocketStatus.DISCONNECT);
+ }
+ return;
+ }
+
+ boolean ok = true;
+
+ if (!buffer.hasRemaining()) {
+ // This means that all data in the buffer has
+ // been
+ // written => Empty the buffer and read again
+ buffer.clear();
+ try {
+ if (attachment.fchannel.read(buffer) >= 0) {
+ buffer.flip();
+ if (attachment.length < buffer.remaining()) {
+ buffer.limit(buffer.limit() - buffer.remaining() + (int) attachment.length);
+ }
+ } else {
+ // Reach the EOF
+ ok = false;
+ }
+ } catch (Throwable th) {
+ if ( log.isDebugEnabled() ) log.debug("Unable to complete sendfile request:", th);
+ ok = false;
+ }
+ }
+
+ if (ok) {
+ socket.getSocket().write(buffer, attachment, this);
+ } else {
+ try {
+ attachment.fchannel.close();
+ } catch (IOException e) {
+ // Ignore
+ }
+ closeSocket(socket, SocketStatus.ERROR);
+ }
+ }
+
+ @Override
+ public void failed(Throwable exc, SendfileData attachment) {
+ // Closing channels
+ closeSocket(socket, SocketStatus.ERROR);
+ try {
+ attachment.fchannel.close();
+ } catch (IOException e) {
+ // Ignore
+ }
+ }
+ });
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ // ---------------------------------------------- SocketProcessor Inner Class
+ /**
+ * This class is the equivalent of the Worker, but will simply use in an
+ * external Executor thread pool.
+ */
+ protected class SocketProcessor implements Runnable {
+
+ private SocketWrapper<Nio2Channel> socket = null;
+ private SocketStatus status = null;
+
+ public SocketProcessor(SocketWrapper<Nio2Channel> socket, SocketStatus status) {
+ reset(socket,status);
+ }
+
+ public void reset(SocketWrapper<Nio2Channel> socket, SocketStatus status) {
+ this.socket = socket;
+ this.status = status;
+ }
+
+ @Override
+ public void run() {
+ // Upgraded connections need to allow multiple threads to access the
+ // connection at the same time to enable blocking IO to be used when
+ // NIO has been configured
+ if (socket != null && socket.isUpgraded() &&
+ SocketStatus.OPEN_WRITE == status) {
+ synchronized (socket.getWriteThreadLock()) {
+ doRun();
+ }
+ } else {
+ synchronized (socket) {
+ doRun();
+ }
+ }
+ }
+
+ private void doRun() {
+ boolean launch = false;
+ try {
+ int handshake = -1;
+
+ try {
+ if (socket != null && socket.getSocket() != null) {
+ // For STOP there is no point trying to handshake as the
+ // Poller has been stopped.
+ if (socket.getSocket().isHandshakeComplete() ||
+ status == SocketStatus.STOP) {
+ handshake = 0;
+ } else {
+ handshake = socket.getSocket().handshake();
+ // The handshake process reads/writes from/to the
+ // socket. status may therefore be OPEN_WRITE once
+ // the handshake completes. However, the handshake
+ // happens when the socket is opened so the status
+ // must always be OPEN_READ after it completes. It
+ // is OK to always set this as it is only used if
+ // the handshake completes.
+ status = SocketStatus.OPEN_READ;
+ }
+ }
+ } catch (IOException x) {
+ handshake = -1;
+ if (log.isDebugEnabled()) log.debug("Error during SSL handshake",x);
+ }
+ if (handshake == 0) {
+ SocketState state = SocketState.OPEN;
+ // Process the request from this socket
+ if (status == null) {
+ state = handler.process(socket, SocketStatus.OPEN_READ);
+ } else {
+ state = handler.process(socket, status);
+ }
+ if (state == SocketState.CLOSED) {
+ // Close socket and pool
+ try {
+ socket.setComet(false);
+ closeSocket(socket, SocketStatus.ERROR);
+ if (useCaches && running && !paused) {
+ nioChannels.push(socket.getSocket());
+ }
+ if (useCaches && running && !paused && socket != null) {
+ socketWrapperCache.push((Nio2SocketWrapper) socket);
+ }
+ } catch (Exception x) {
+ log.error("",x);
+ }
+ } else if (state == SocketState.UPGRADING) {
+ socket.setKeptAlive(true);
+ socket.access();
+ launch = true;
+ }
+ } else if (handshake == -1 ) {
+ if (socket != null) {
+ closeSocket(socket, SocketStatus.DISCONNECT);
+ }
+ if (useCaches && running && !paused) {
+ nioChannels.push(socket.getSocket());
+ }
+ if (useCaches && running && !paused && socket != null) {
+ socketWrapperCache.push(((Nio2SocketWrapper) socket));
+ }
+ }
+ } catch (OutOfMemoryError oom) {
+ try {
+ oomParachuteData = null;
+ log.error("", oom);
+ if (socket != null) {
+ closeSocket(socket, SocketStatus.ERROR);
+ }
+ releaseCaches();
+ } catch (Throwable oomt) {
+ try {
+ System.err.println(oomParachuteMsg);
+ oomt.printStackTrace();
+ } catch (Throwable letsHopeWeDontGetHere){
+ ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
+ }
+ }
+ } catch (VirtualMachineError vme) {
+ ExceptionUtils.handleThrowable(vme);
+ } catch (Throwable t) {
+ log.error("", t);
+ if (socket != null) {
+ closeSocket(socket, SocketStatus.ERROR);
+ }
+ } finally {
+ if (launch) {
+ try {
+ getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN_READ));
+ } catch (NullPointerException npe) {
+ if (running) {
+ log.error(sm.getString("endpoint.launch.fail"),
+ npe);
+ }
+ }
+ }
+ socket = null;
+ status = null;
+ //return to cache
+ if (useCaches && running && !paused) {
+ processorCache.push(this);
+ }
+ }
+ }
+ }
+
+ // ----------------------------------------------- SendfileData Inner Class
+ /**
+ * SendfileData class.
+ */
+ public static class SendfileData {
+ // File
+ public String fileName;
+ public FileChannel fchannel;
+ public long pos;
+ public long length;
+ // KeepAlive flag
+ public boolean keepAlive;
+ }
+}
Added: tomcat/trunk/java/org/apache/tomcat/util/net/SecureNio2Channel.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/SecureNio2Channel.java?rev=1575905&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/SecureNio2Channel.java (added)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/SecureNio2Channel.java Mon Mar 10 11:27:11 2014
@@ -0,0 +1,911 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.util.net;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousSocketChannel;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.CompletionHandler;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLEngineResult.HandshakeStatus;
+import javax.net.ssl.SSLEngineResult.Status;
+import javax.net.ssl.SSLException;
+
+/**
+ * Implementation of a secure socket channel for NIO2.
+ */
+public class SecureNio2Channel extends Nio2Channel {
+
+ protected ByteBuffer netInBuffer;
+ protected ByteBuffer netOutBuffer;
+
+ protected SSLEngine sslEngine;
+ protected final Nio2Endpoint endpoint;
+ protected SocketWrapper<Nio2Channel> socket;
+
+ protected boolean handshakeComplete = false;
+ protected HandshakeStatus handshakeStatus; //gets set by handshake
+
+ protected boolean closed = false;
+ protected boolean closing = false;
+ protected boolean readPending = false;
+ protected boolean writePending = false;
+
+ private CompletionHandler<Integer, SocketWrapper<Nio2Channel>> handshakeReadCompletionHandler;
+ private CompletionHandler<Integer, SocketWrapper<Nio2Channel>> handshakeWriteCompletionHandler;
+
+ public SecureNio2Channel(AsynchronousSocketChannel channel, SSLEngine engine,
+ ApplicationBufferHandler bufHandler, Nio2Endpoint endpoint0) throws IOException {
+ super(channel, bufHandler);
+ this.sslEngine = engine;
+ this.endpoint = endpoint0;
+ int appBufSize = sslEngine.getSession().getApplicationBufferSize();
+ int netBufSize = sslEngine.getSession().getPacketBufferSize();
+ //allocate network buffers - TODO, add in optional direct non-direct buffers
+ netInBuffer = ByteBuffer.allocateDirect(netBufSize);
+ netOutBuffer = ByteBuffer.allocateDirect(netBufSize);
+
+ handshakeReadCompletionHandler = new CompletionHandler<Integer, SocketWrapper<Nio2Channel>>() {
+ @Override
+ public void completed(Integer result, SocketWrapper<Nio2Channel> attachment) {
+ if (result < 0) {
+ failed(new IOException("Error"), attachment);
+ return;
+ }
+ endpoint.processSocket(attachment, SocketStatus.OPEN_READ, false);
+ }
+ @Override
+ public void failed(Throwable exc, SocketWrapper<Nio2Channel> attachment) {
+ endpoint.closeSocket(attachment, SocketStatus.ERROR);
+ }
+ };
+ handshakeWriteCompletionHandler = new CompletionHandler<Integer, SocketWrapper<Nio2Channel>>() {
+ @Override
+ public void completed(Integer result, SocketWrapper<Nio2Channel> attachment) {
+ if (result < 0) {
+ failed(new IOException("Error"), attachment);
+ return;
+ }
+ endpoint.processSocket(attachment, SocketStatus.OPEN_WRITE, false);
+ }
+ @Override
+ public void failed(Throwable exc, SocketWrapper<Nio2Channel> attachment) {
+ endpoint.closeSocket(attachment, SocketStatus.ERROR);
+ }
+ };
+
+ //ensure that the application has a large enough read/write buffers
+ //by doing this, we should not encounter any buffer overflow errors
+ // FIXME: this does nothing, so it is in the NIO2 endpoint
+ bufHandler.expand(bufHandler.getReadBuffer(), appBufSize);
+ reset();
+ }
+
+ void setSocket(SocketWrapper<Nio2Channel> socket) {
+ this.socket = socket;
+ }
+
+ public void reset(SSLEngine engine) throws IOException {
+ this.sslEngine = engine;
+ reset();
+ }
+
+ @Override
+ public void reset() throws IOException {
+ super.reset();
+ netOutBuffer.position(0);
+ netOutBuffer.limit(0);
+ netInBuffer.position(0);
+ netInBuffer.limit(0);
+ handshakeComplete = false;
+ closed = false;
+ closing = false;
+ readPending = false;
+ writePending = false;
+ //initiate handshake
+ sslEngine.beginHandshake();
+ handshakeStatus = sslEngine.getHandshakeStatus();
+ }
+
+ @Override
+ public int getBufferSize() {
+ int size = super.getBufferSize();
+ size += netInBuffer!=null?netInBuffer.capacity():0;
+ size += netOutBuffer!=null?netOutBuffer.capacity():0;
+ return size;
+ }
+
+ private class FutureFlush implements Future<Boolean> {
+ private Future<Integer> integer;
+ protected FutureFlush(Future<Integer> integer) {
+ this.integer = integer;
+ }
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ return integer.cancel(mayInterruptIfRunning);
+ }
+ @Override
+ public boolean isCancelled() {
+ return integer.isCancelled();
+ }
+ @Override
+ public boolean isDone() {
+ return integer.isDone();
+ }
+ @Override
+ public Boolean get() throws InterruptedException,
+ ExecutionException {
+ int result = integer.get();
+ return result >= 0;
+ }
+ @Override
+ public Boolean get(long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException,
+ TimeoutException {
+ int result = integer.get(timeout, unit);
+ return result >= 0;
+ }
+ };
+
+ /**
+ * Flush the channel.
+ *
+ * @return <code>true</code> if the network buffer has been flushed out and
+ * is empty else <code>false</code> (as a future)
+ * @throws IOException
+ */
+ @Override
+ public Future<Boolean> flush()
+ throws IOException {
+ return new FutureFlush(sc.write(netOutBuffer));
+ }
+
+ /**
+ * Performs SSL handshake, non blocking, but performs NEED_TASK on the same thread.<br>
+ * Hence, you should never call this method using your Acceptor thread, as you would slow down
+ * your system significantly.<br>
+ * The return for this operation is 0 if the handshake is complete and a positive value if it is not complete.
+ * In the event of a positive value coming back, reregister the selection key for the return values interestOps.
+ *
+ * @return int - 0 if hand shake is complete, otherwise it returns a SelectionKey interestOps value
+ * @throws IOException
+ */
+ @Override
+ public int handshake() throws IOException {
+ return handshakeInternal(true);
+ }
+
+ protected int handshakeInternal(boolean async) throws IOException {
+ if (handshakeComplete)
+ return 0; //we have done our initial handshake
+
+ SSLEngineResult handshake = null;
+
+ while (!handshakeComplete) {
+ switch (handshakeStatus) {
+ case NOT_HANDSHAKING: {
+ //should never happen
+ throw new IOException("NOT_HANDSHAKING during handshake");
+ }
+ case FINISHED: {
+ //we are complete if we have delivered the last package
+ handshakeComplete = !netOutBuffer.hasRemaining();
+ //return 0 if we are complete, otherwise we still have data to write
+ if (handshakeComplete) {
+ return 0;
+ } else {
+ if (async) {
+ sc.write(netOutBuffer, socket, handshakeWriteCompletionHandler);
+ } else {
+ try {
+ sc.write(netOutBuffer).get(endpoint.getSoTimeout(), TimeUnit.MILLISECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ throw new IOException("Handshake error");
+ }
+ }
+ return Nio2Endpoint.OP_WRITE;
+ }
+ }
+ case NEED_WRAP: {
+ //perform the wrap function
+ handshake = handshakeWrap();
+ if (handshake.getStatus() == Status.OK){
+ if (handshakeStatus == HandshakeStatus.NEED_TASK)
+ handshakeStatus = tasks();
+ } else {
+ //wrap should always work with our buffers
+ throw new IOException("Unexpected status:" + handshake.getStatus() + " during handshake WRAP.");
+ }
+ if (handshakeStatus != HandshakeStatus.NEED_UNWRAP || netOutBuffer.remaining() > 0) {
+ //should actually return OP_READ if we have NEED_UNWRAP
+ if (async) {
+ sc.write(netOutBuffer, socket, handshakeWriteCompletionHandler);
+ } else {
+ try {
+ sc.write(netOutBuffer).get(endpoint.getSoTimeout(), TimeUnit.MILLISECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ throw new IOException("Handshake error");
+ }
+ }
+ return Nio2Endpoint.OP_WRITE;
+ }
+ //fall down to NEED_UNWRAP on the same call, will result in a
+ //BUFFER_UNDERFLOW if it needs data
+ }
+ //$FALL-THROUGH$
+ case NEED_UNWRAP: {
+ //perform the unwrap function
+ handshake = handshakeUnwrap();
+ if (handshake.getStatus() == Status.OK) {
+ if (handshakeStatus == HandshakeStatus.NEED_TASK)
+ handshakeStatus = tasks();
+ } else if (handshake.getStatus() == Status.BUFFER_UNDERFLOW) {
+ //read more data, reregister for OP_READ
+ if (async) {
+ sc.read(netInBuffer, socket, handshakeReadCompletionHandler);
+ } else {
+ try {
+ sc.read(netInBuffer).get(endpoint.getSoTimeout(), TimeUnit.MILLISECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ throw new IOException("Handshake error");
+ }
+ }
+ return Nio2Endpoint.OP_READ;
+ } else {
+ throw new IOException("Invalid handshake status:"+handshakeStatus+" during handshake UNWRAP.");
+ }
+ break;
+ }
+ case NEED_TASK: {
+ handshakeStatus = tasks();
+ break;
+ }
+ default: throw new IllegalStateException("Invalid handshake status:"+handshakeStatus);
+ }
+ }
+ //return 0 if we are complete, otherwise recurse to process the task
+ return handshakeComplete ? 0 : handshakeInternal(async);
+ }
+
+ /**
+ * Force a blocking handshake to take place for this key.
+ * This requires that both network and application buffers have been emptied out prior to this call taking place, or a
+ * IOException will be thrown.
+ * @throws IOException - if an IO exception occurs or if application or network buffers contain data
+ * @throws SocketTimeoutException - if a socket operation timed out
+ */
+ public void rehandshake() throws IOException {
+ //validate the network buffers are empty
+ if (netInBuffer.position() > 0 && netInBuffer.position()<netInBuffer.limit()) throw new IOException("Network input buffer still contains data. Handshake will fail.");
+ if (netOutBuffer.position() > 0 && netOutBuffer.position()<netOutBuffer.limit()) throw new IOException("Network output buffer still contains data. Handshake will fail.");
+ if (getBufHandler().getReadBuffer().position()>0 && getBufHandler().getReadBuffer().position()<getBufHandler().getReadBuffer().limit()) throw new IOException("Application input buffer still contains data. Data would have been lost.");
+ if (getBufHandler().getWriteBuffer().position()>0 && getBufHandler().getWriteBuffer().position()<getBufHandler().getWriteBuffer().limit()) throw new IOException("Application output buffer still contains data. Data would have been lost.");
+ reset();
+ boolean handshaking = true;
+ try {
+ while (handshaking) {
+ int hsStatus = handshakeInternal(false);
+ switch (hsStatus) {
+ case -1 : throw new EOFException("EOF during handshake.");
+ case 0 : handshaking = false; break;
+ default : // Some blocking IO occurred, so iterate
+ }
+ }
+ } catch (IOException x) {
+ throw x;
+ } catch (Exception cx) {
+ IOException x = new IOException(cx);
+ throw x;
+ }
+ }
+
+
+
+ /**
+ * Executes all the tasks needed on the same thread.
+ * @return HandshakeStatus
+ */
+ protected SSLEngineResult.HandshakeStatus tasks() {
+ Runnable r = null;
+ while ( (r = sslEngine.getDelegatedTask()) != null) {
+ r.run();
+ }
+ return sslEngine.getHandshakeStatus();
+ }
+
+ /**
+ * Performs the WRAP function
+ * @param doWrite boolean
+ * @return SSLEngineResult
+ * @throws IOException
+ */
+ protected SSLEngineResult handshakeWrap() throws IOException {
+ //this should never be called with a network buffer that contains data
+ //so we can clear it here.
+ netOutBuffer.clear();
+ //perform the wrap
+ SSLEngineResult result = sslEngine.wrap(bufHandler.getWriteBuffer(), netOutBuffer);
+ //prepare the results to be written
+ netOutBuffer.flip();
+ //set the status
+ handshakeStatus = result.getHandshakeStatus();
+ return result;
+ }
+
+ /**
+ * Perform handshake unwrap
+ * @param doread boolean
+ * @return SSLEngineResult
+ * @throws IOException
+ */
+ protected SSLEngineResult handshakeUnwrap() throws IOException {
+
+ if (netInBuffer.position() == netInBuffer.limit()) {
+ //clear the buffer if we have emptied it out on data
+ netInBuffer.clear();
+ }
+ SSLEngineResult result;
+ boolean cont = false;
+ //loop while we can perform pure SSLEngine data
+ do {
+ //prepare the buffer with the incoming data
+ netInBuffer.flip();
+ //call unwrap
+ result = sslEngine.unwrap(netInBuffer, bufHandler.getReadBuffer());
+ //compact the buffer, this is an optional method, wonder what would happen if we didn't
+ netInBuffer.compact();
+ //read in the status
+ handshakeStatus = result.getHandshakeStatus();
+ if (result.getStatus() == SSLEngineResult.Status.OK &&
+ result.getHandshakeStatus() == HandshakeStatus.NEED_TASK) {
+ //execute tasks if we need to
+ handshakeStatus = tasks();
+ }
+ //perform another unwrap?
+ cont = result.getStatus() == SSLEngineResult.Status.OK &&
+ handshakeStatus == HandshakeStatus.NEED_UNWRAP;
+ } while (cont);
+ return result;
+ }
+
+ /**
+ * Sends a SSL close message, will not physically close the connection here.<br>
+ * To close the connection, you could do something like
+ * <pre><code>
+ * close();
+ * while (isOpen() && !myTimeoutFunction()) Thread.sleep(25);
+ * if ( isOpen() ) close(true); //forces a close if you timed out
+ * </code></pre>
+ * @throws IOException if an I/O error occurs
+ * @throws IOException if there is data on the outgoing network buffer and we are unable to flush it
+ */
+ @Override
+ public void close() throws IOException {
+ if (closing) return;
+ closing = true;
+ sslEngine.closeOutbound();
+
+ try {
+ if (!flush().get(endpoint.getSoTimeout(), TimeUnit.MILLISECONDS)) {
+ throw new IOException("Remaining data in the network buffer, can't send SSL close message, force a close with close(true) instead");
+ }
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ throw new IOException("Remaining data in the network buffer, can't send SSL close message, force a close with close(true) instead", e);
+ }
+ //prep the buffer for the close message
+ netOutBuffer.clear();
+ //perform the close, since we called sslEngine.closeOutbound
+ SSLEngineResult handshake = sslEngine.wrap(getEmptyBuf(), netOutBuffer);
+ //we should be in a close state
+ if (handshake.getStatus() != SSLEngineResult.Status.CLOSED) {
+ throw new IOException("Invalid close state, will not send network data.");
+ }
+ //prepare the buffer for writing
+ netOutBuffer.flip();
+ //if there is data to be written
+ flush();
+
+ //is the channel closed?
+ closed = (!netOutBuffer.hasRemaining() && (handshake.getHandshakeStatus() != HandshakeStatus.NEED_WRAP));
+ }
+
+ /**
+ * Force a close, can throw an IOException
+ * @param force boolean
+ * @throws IOException
+ */
+ @Override
+ public void close(boolean force) throws IOException {
+ try {
+ close();
+ } finally {
+ if ( force || closed ) {
+ closed = true;
+ sc.close();
+ }
+ }
+ }
+
+ private class FutureRead implements Future<Integer> {
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ return false;
+ }
+ @Override
+ public boolean isCancelled() {
+ return false;
+ }
+ @Override
+ public boolean isDone() {
+ return true;
+ }
+ @Override
+ public Integer get() throws InterruptedException, ExecutionException {
+ return unwrap(netInBuffer.position());
+ }
+ @Override
+ public Integer get(long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException,
+ TimeoutException {
+ return unwrap(netInBuffer.position());
+ }
+ protected Integer unwrap(int netread) throws ExecutionException {
+ //are we in the middle of closing or closed?
+ if (closing || closed)
+ return -1;
+ //did we reach EOF? if so send EOF up one layer.
+ if (netread == -1)
+ return -1;
+ //the data read
+ int read = 0;
+ //the SSL engine result
+ SSLEngineResult unwrap;
+ do {
+ //prepare the buffer
+ netInBuffer.flip();
+ //unwrap the data
+ try {
+ unwrap = sslEngine.unwrap(netInBuffer, bufHandler.getReadBuffer());
+ } catch (SSLException e) {
+ throw new ExecutionException(e);
+ }
+ //compact the buffer
+ netInBuffer.compact();
+ if (unwrap.getStatus()==Status.OK || unwrap.getStatus()==Status.BUFFER_UNDERFLOW) {
+ //we did receive some data, add it to our total
+ read += unwrap.bytesProduced();
+ //perform any tasks if needed
+ if (unwrap.getHandshakeStatus() == HandshakeStatus.NEED_TASK)
+ tasks();
+ //if we need more network data, then bail out for now.
+ if (unwrap.getStatus() == Status.BUFFER_UNDERFLOW)
+ break;
+ } else if (unwrap.getStatus()==Status.BUFFER_OVERFLOW && read > 0) {
+ //buffer overflow can happen, if we have read data, then
+ //empty out the dst buffer before we do another read
+ break;
+ } else {
+ //here we should trap BUFFER_OVERFLOW and call expand on the buffer
+ //for now, throw an exception, as we initialized the buffers
+ //in the constructor
+ throw new ExecutionException(new IOException("Unable to unwrap data, invalid status: " + unwrap.getStatus()));
+ }
+ } while ((netInBuffer.position() != 0)); //continue to unwrapping as long as the input buffer has stuff
+ return (read);
+ }
+ }
+
+ private class FutureNetRead extends FutureRead {
+ private Future<Integer> integer;
+ protected FutureNetRead() {
+ this.integer = sc.read(netInBuffer);
+ }
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ return integer.cancel(mayInterruptIfRunning);
+ }
+ @Override
+ public boolean isCancelled() {
+ return integer.isCancelled();
+ }
+ @Override
+ public boolean isDone() {
+ return integer.isDone();
+ }
+ @Override
+ public Integer get() throws InterruptedException, ExecutionException {
+ int netread = integer.get();
+ return unwrap(netread);
+ }
+ @Override
+ public Integer get(long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException,
+ TimeoutException {
+ int netread = integer.get(timeout, unit);
+ return unwrap(netread);
+ }
+ }
+
+ /**
+ * Reads a sequence of bytes from this channel into the given buffer.
+ *
+ * @param dst The buffer into which bytes are to be transferred
+ * @return The number of bytes read, possibly zero, or <tt>-1</tt> if the channel has reached end-of-stream
+ * @throws IOException If some other I/O error occurs
+ * @throws IllegalArgumentException if the destination buffer is different than bufHandler.getReadBuffer()
+ */
+ @Override
+ public Future<Integer> read(ByteBuffer dst) {
+ //did we finish our handshake?
+ if (!handshakeComplete)
+ throw new IllegalStateException("Handshake incomplete, you must complete handshake before reading data.");
+ if (netInBuffer.position() > 0) {
+ return new FutureRead();
+ } else {
+ return new FutureNetRead();
+ }
+ }
+
+ private class FutureWrite implements Future<Integer> {
+ private Future<Integer> integer = null;
+ private int written = 0;
+ private Throwable t = null;
+ protected FutureWrite() {
+ //are we closing or closed?
+ if (closing || closed) {
+ t = new IOException("Channel is in closing state.");
+ return;
+ }
+ //The data buffer should be empty, we can reuse the entire buffer.
+ netOutBuffer.clear();
+ try {
+ SSLEngineResult result = sslEngine.wrap(bufHandler.getWriteBuffer(), netOutBuffer);
+ written = result.bytesConsumed();
+ netOutBuffer.flip();
+ if (result.getStatus() == Status.OK) {
+ if (result.getHandshakeStatus() == HandshakeStatus.NEED_TASK)
+ tasks();
+ } else {
+ t = new IOException("Unable to wrap data, invalid engine state: " +result.getStatus());
+ }
+ integer = sc.write(netOutBuffer);
+ } catch (SSLException e) {
+ t = e;
+ }
+ }
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ return integer.cancel(mayInterruptIfRunning);
+ }
+ @Override
+ public boolean isCancelled() {
+ return integer.isCancelled();
+ }
+ @Override
+ public boolean isDone() {
+ return integer.isDone();
+ }
+ @Override
+ public Integer get() throws InterruptedException, ExecutionException {
+ if (t != null) {
+ throw new ExecutionException(t);
+ }
+ integer.get();
+ return written;
+ }
+ @Override
+ public Integer get(long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException,
+ TimeoutException {
+ if (t != null) {
+ throw new ExecutionException(t);
+ }
+ integer.get(timeout, unit);
+ return written;
+ }
+ }
+
+ /**
+ * Writes a sequence of bytes to this channel from the given buffer.
+ *
+ * @param src The buffer from which bytes are to be retrieved
+ * @return The number of bytes written, possibly zero
+ * @throws IOException If some other I/O error occurs
+ */
+ @Override
+ public Future<Integer> write(ByteBuffer src) {
+ return new FutureWrite();
+ }
+
+ private class ReadCompletionHandler<A> implements CompletionHandler<Integer, A> {
+ protected ByteBuffer dst;
+ protected CompletionHandler<Integer, ? super A> handler;
+ protected ReadCompletionHandler(ByteBuffer dst, CompletionHandler<Integer, ? super A> handler) {
+ this.dst = dst;
+ this.handler = handler;
+ }
+
+ @Override
+ public void completed(Integer nBytes, A attach) {
+ if (nBytes < 0) {
+ handler.failed(new ClosedChannelException(), attach);
+ return;
+ }
+ try {
+ //the data read
+ int read = 0;
+ //the SSL engine result
+ SSLEngineResult unwrap;
+ do {
+ //prepare the buffer
+ netInBuffer.flip();
+ //unwrap the data
+ unwrap = sslEngine.unwrap(netInBuffer, dst);
+ //compact the buffer
+ netInBuffer.compact();
+ if (unwrap.getStatus() == Status.OK || unwrap.getStatus() == Status.BUFFER_UNDERFLOW) {
+ //we did receive some data, add it to our total
+ read += unwrap.bytesProduced();
+ //perform any tasks if needed
+ if (unwrap.getHandshakeStatus() == HandshakeStatus.NEED_TASK)
+ tasks();
+ //if we need more network data, then bail out for now.
+ if (unwrap.getStatus() == Status.BUFFER_UNDERFLOW)
+ break;
+ } else if (unwrap.getStatus() == Status.BUFFER_OVERFLOW && read > 0) {
+ //buffer overflow can happen, if we have read data, then
+ //empty out the dst buffer before we do another read
+ break;
+ } else {
+ //here we should trap BUFFER_OVERFLOW and call expand on the buffer
+ //for now, throw an exception, as we initialized the buffers
+ //in the constructor
+ throw new IOException("Unable to unwrap data, invalid status: " + unwrap.getStatus());
+ }
+ } while ((netInBuffer.position() != 0)); //continue to unwrapping as long as the input buffer has stuff
+ // If everything is OK, so complete
+ handler.completed(read, attach);
+ } catch (Exception e) {
+ // The operation must fails
+ handler.failed(e, attach);
+ }
+ }
+ @Override
+ public void failed(Throwable exc, A attach) {
+ handler.failed(exc, attach);
+ }
+ }
+
+ @Override
+ public <A> void read(final ByteBuffer dst,
+ long timeout, TimeUnit unit, final A attachment,
+ final CompletionHandler<Integer, ? super A> handler) {
+ //are we in the middle of closing or closed?
+ if (closing || closed) {
+ handler.completed(-1, attachment);
+ return;
+ }
+ //did we finish our handshake?
+ if (!handshakeComplete)
+ throw new IllegalStateException("Handshake incomplete, you must complete handshake before reading data.");
+ ReadCompletionHandler<A> readCompletionHandler = new ReadCompletionHandler<A>(dst, handler);
+ if (netInBuffer.position() > 0 ) {
+ readCompletionHandler.completed(netInBuffer.position(), attachment);
+ } else {
+ sc.read(netInBuffer, timeout, unit, attachment, readCompletionHandler);
+ }
+ }
+
+ @Override
+ public <A> void write(final ByteBuffer src, long timeout, TimeUnit unit, final A attachment,
+ final CompletionHandler<Integer, ? super A> handler) {
+ //are we closing or closed?
+ if (closing || closed) {
+ handler.failed(new IOException("Channel is in closing state."), attachment);
+ return;
+ }
+
+ try {
+ // Prepare the output buffer
+ this.netOutBuffer.clear();
+ // Wrap the source data into the internal buffer
+ SSLEngineResult result = sslEngine.wrap(bufHandler.getWriteBuffer(), netOutBuffer);
+ final int written = result.bytesConsumed();
+ netOutBuffer.flip();
+ if (result.getStatus() == Status.OK) {
+ if (result.getHandshakeStatus() == HandshakeStatus.NEED_TASK)
+ tasks();
+ } else {
+ handler.failed(new IOException("Unable to wrap data, invalid engine state: " +result.getStatus()), attachment);
+ return;
+ }
+ // Write data to the channel
+ sc.write(this.netOutBuffer, timeout, unit, attachment,
+ new CompletionHandler<Integer, A>() {
+ @Override
+ public void completed(Integer nBytes, A attach) {
+ if (nBytes < 0) {
+ handler.failed(new ClosedChannelException(), attach);
+ } else {
+ // Call the handler completed method with the
+ // consumed bytes number
+ handler.completed(written, attach);
+ }
+ }
+ @Override
+ public void failed(Throwable exc, A attach) {
+ handler.failed(exc, attach);
+ }
+ });
+ } catch (Throwable exp) {
+ handler.failed(exp, attachment);
+ }
+ }
+
+ private class GatherState<A> {
+ public ByteBuffer[] srcs;
+ public int offset;
+ public int length;
+ public A attachment;
+ public long timeout;
+ public TimeUnit unit;
+ public CompletionHandler<Long, ? super A> handler;
+ protected GatherState(ByteBuffer[] srcs, int offset, int length,
+ long timeout, TimeUnit unit, A attachment,
+ CompletionHandler<Long, ? super A> handler) {
+ this.srcs = srcs;
+ this.offset = offset;
+ this.length = length;
+ this.timeout = timeout;
+ this.unit = unit;
+ this.attachment = attachment;
+ this.handler = handler;
+ this.pos = offset + 1;
+ }
+ public long writeCount = 0;
+ public int pos;
+ }
+
+ private class GatherCompletionHandler<A> implements CompletionHandler<Integer, GatherState<A>> {
+ protected GatherState<A> state;
+ protected GatherCompletionHandler(GatherState<A> state) {
+ this.state = state;
+ }
+ @Override
+ public void completed(Integer nBytes, GatherState<A> attachment) {
+ if (nBytes < 0) {
+ state.handler.failed(new ClosedChannelException(), state.attachment);
+ } else {
+ if (state.pos == state.offset + state.length) {
+ state.handler.completed(state.writeCount, state.attachment);
+ return;
+ }
+ try {
+ // Prepare the output buffer
+ netOutBuffer.clear();
+ // Wrap the source data into the internal buffer
+ SSLEngineResult result = sslEngine.wrap(state.srcs[state.offset], netOutBuffer);
+ state.writeCount += result.bytesConsumed();
+ netOutBuffer.flip();
+ if (result.getStatus() == Status.OK) {
+ if (result.getHandshakeStatus() == HandshakeStatus.NEED_TASK)
+ tasks();
+ } else {
+ failed(new IOException("Unable to wrap data, invalid engine state: " +result.getStatus()), attachment);
+ return;
+ }
+ state.offset++;
+ // Write data to the channel
+ sc.write(netOutBuffer, state.timeout, state.unit, state, this);
+ } catch (Throwable exp) {
+ failed(exp, attachment);
+ }
+ }
+ }
+ @Override
+ public void failed(Throwable exc, GatherState<A> attachment) {
+ state.handler.failed(exc, state.attachment);
+ }
+ }
+
+ @Override
+ public <A> void write(ByteBuffer[] srcs, int offset, int length,
+ long timeout, TimeUnit unit, A attachment,
+ CompletionHandler<Long, ? super A> handler) {
+ if ((offset < 0) || (length < 0) || (offset > srcs.length - length)) {
+ throw new IndexOutOfBoundsException();
+ }
+ //are we closing or closed?
+ if (closing || closed) {
+ handler.failed(new IOException("Channel is in closing state."), attachment);
+ return;
+ }
+ try {
+ GatherState<A> state = new GatherState<>(srcs, offset, length,
+ timeout, unit, attachment, handler);
+ // Prepare the output buffer
+ netOutBuffer.clear();
+ // Wrap the source data into the internal buffer
+ SSLEngineResult result = sslEngine.wrap(srcs[offset], netOutBuffer);
+ state.writeCount += result.bytesConsumed();
+ netOutBuffer.flip();
+ if (result.getStatus() == Status.OK) {
+ if (result.getHandshakeStatus() == HandshakeStatus.NEED_TASK)
+ tasks();
+ } else {
+ handler.failed(new IOException("Unable to wrap data, invalid engine state: " +result.getStatus()), attachment);
+ return;
+ }
+ // Write data to the channel
+ sc.write(netOutBuffer, timeout, unit, state, new GatherCompletionHandler<A>(state));
+ } catch (Throwable exp) {
+ handler.failed(exp, attachment);
+ }
+ }
+
+ /**
+ * Callback interface to be able to expand buffers
+ * when buffer overflow exceptions happen
+ */
+ public static interface ApplicationBufferHandler {
+ public ByteBuffer expand(ByteBuffer buffer, int remaining);
+ public ByteBuffer getReadBuffer();
+ public ByteBuffer getWriteBuffer();
+ }
+
+ @Override
+ public ApplicationBufferHandler getBufHandler() {
+ return bufHandler;
+ }
+
+ @Override
+ public boolean isHandshakeComplete() {
+ return handshakeComplete;
+ }
+
+ @Override
+ public boolean isClosing() {
+ return closing;
+ }
+
+ public SSLEngine getSslEngine() {
+ return sslEngine;
+ }
+
+ public ByteBuffer getEmptyBuf() {
+ return emptyBuf;
+ }
+
+ public void setBufHandler(ApplicationBufferHandler bufHandler) {
+ this.bufHandler = bufHandler;
+ }
+
+ @Override
+ public AsynchronousSocketChannel getIOChannel() {
+ return sc;
+ }
+
+}
\ No newline at end of file
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/SocketProperties.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/SocketProperties.java?rev=1575905&r1=1575904&r2=1575905&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/SocketProperties.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/SocketProperties.java Mon Mar 10 11:27:11 2014
@@ -16,9 +16,13 @@
*/
package org.apache.tomcat.util.net;
+import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
+import java.net.StandardSocketOptions;
+import java.nio.channels.AsynchronousServerSocketChannel;
+import java.nio.channels.AsynchronousSocketChannel;
/**
* Properties that can be set in the <Connector> element
@@ -27,13 +31,13 @@ import java.net.SocketException;
*/
public class SocketProperties {
/**
- * Enable/disable key cache, this bounded cache stores
- * KeyAttachment objects to reduce GC
+ * Enable/disable socket wrapper cache, this bounded cache stores
+ * SocketWrapper objects to reduce GC
* Default is 500
* -1 is unlimited
* 0 is disabled
*/
- protected int keyCache = 500;
+ protected int socketWrapperCache = 500;
/**
* Enable/disable socket processor cache, this bounded cache stores
@@ -212,6 +216,27 @@ public class SocketProperties {
socket.setSoTimeout(soTimeout.intValue());
}
+ public void setProperties(AsynchronousSocketChannel socket) throws IOException {
+ if (rxBufSize != null)
+ socket.setOption(StandardSocketOptions.SO_RCVBUF, rxBufSize.intValue());
+ if (txBufSize != null)
+ socket.setOption(StandardSocketOptions.SO_SNDBUF, txBufSize.intValue());
+ if (soKeepAlive != null)
+ socket.setOption(StandardSocketOptions.SO_KEEPALIVE, soKeepAlive.booleanValue());
+ if (soReuseAddress != null)
+ socket.setOption(StandardSocketOptions.SO_REUSEADDR, soReuseAddress.booleanValue());
+ if (soLingerOn != null && soLingerOn.booleanValue() && soLingerTime != null)
+ socket.setOption(StandardSocketOptions.SO_LINGER, soLingerTime.intValue());
+ if (tcpNoDelay != null)
+ socket.setOption(StandardSocketOptions.TCP_NODELAY, tcpNoDelay.booleanValue());
+ }
+
+ public void setProperties(AsynchronousServerSocketChannel socket) throws IOException {
+ if (rxBufSize != null)
+ socket.setOption(StandardSocketOptions.SO_RCVBUF, rxBufSize.intValue());
+ if (soReuseAddress != null)
+ socket.setOption(StandardSocketOptions.SO_REUSEADDR, soReuseAddress.booleanValue());
+ }
public boolean getDirectBuffer() {
return directBuffer;
@@ -278,7 +303,11 @@ public class SocketProperties {
}
public int getKeyCache() {
- return keyCache;
+ return socketWrapperCache;
+ }
+
+ public int getSocketWrapperCache() {
+ return socketWrapperCache;
}
public int getAppReadBufSize() {
@@ -366,8 +395,12 @@ public class SocketProperties {
this.eventCache = eventCache;
}
+ public void setSocketWrapperCache(int socketWrapperCache) {
+ this.socketWrapperCache = socketWrapperCache;
+ }
+
public void setKeyCache(int keyCache) {
- this.keyCache = keyCache;
+ this.socketWrapperCache = keyCache;
}
public void setAppReadBufSize(int appReadBufSize) {
Modified: tomcat/trunk/webapps/docs/changelog.xml
URL: http://svn.apache.org/viewvc/tomcat/trunk/webapps/docs/changelog.xml?rev=1575905&r1=1575904&r2=1575905&view=diff
==============================================================================
--- tomcat/trunk/webapps/docs/changelog.xml (original)
+++ tomcat/trunk/webapps/docs/changelog.xml Mon Mar 10 11:27:11 2014
@@ -108,6 +108,10 @@
NIO connector and a request is sent using more than one AJP message.
Patch provided by Amund Elstad. (markt)
</fix>
+ <add>
+ Adds experimental NIO2 connector. Based on code developed by
+ Nabil Benothman. (remm)
+ </add>
</changelog>
</subsection>
<subsection name="Jasper">
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org