You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by re...@apache.org on 2019/05/14 16:00:44 UTC

[tomcat] branch master updated: Refactor async IO implementation to SocketWrapperBase

This is an automated email from the ASF dual-hosted git repository.

remm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tomcat.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d6388a  Refactor async IO implementation to SocketWrapperBase
9d6388a is described below

commit 9d6388affba4e9140c50e8ca8938569f8709d008
Author: remm <re...@apache.org>
AuthorDate: Tue May 14 18:00:32 2019 +0200

    Refactor async IO implementation to SocketWrapperBase
    
    Remove all duplicate code I could find, although it is likely there will
    be further tweaks needed.
---
 .../catalina/security/SecurityClassLoad.java       |  10 +-
 java/org/apache/tomcat/util/net/AprEndpoint.java   | 250 ++---------------
 java/org/apache/tomcat/util/net/Nio2Endpoint.java  | 224 ++-------------
 java/org/apache/tomcat/util/net/NioEndpoint.java   | 245 ++---------------
 .../apache/tomcat/util/net/SocketWrapperBase.java  | 301 ++++++++++++++++++++-
 webapps/docs/changelog.xml                         |   4 +
 6 files changed, 379 insertions(+), 655 deletions(-)

diff --git a/java/org/apache/catalina/security/SecurityClassLoad.java b/java/org/apache/catalina/security/SecurityClassLoad.java
index 05272f4..2a44caf 100644
--- a/java/org/apache/catalina/security/SecurityClassLoad.java
+++ b/java/org/apache/catalina/security/SecurityClassLoad.java
@@ -190,16 +190,14 @@ public final class SecurityClassLoad {
         loader.loadClass(basePackage + "util.net.NioBlockingSelector$BlockPoller$RunnableAdd");
         loader.loadClass(basePackage + "util.net.NioBlockingSelector$BlockPoller$RunnableCancel");
         loader.loadClass(basePackage + "util.net.NioBlockingSelector$BlockPoller$RunnableRemove");
-        loader.loadClass(basePackage + "util.net.AprEndpoint$AprSocketWrapper$OperationState");
-        loader.loadClass(basePackage + "util.net.AprEndpoint$AprSocketWrapper$VectoredIOCompletionHandler");
-        loader.loadClass(basePackage + "util.net.NioEndpoint$NioSocketWrapper$OperationState");
-        loader.loadClass(basePackage + "util.net.NioEndpoint$NioSocketWrapper$VectoredIOCompletionHandler");
-        loader.loadClass(basePackage + "util.net.Nio2Endpoint$Nio2SocketWrapper$OperationState");
-        loader.loadClass(basePackage + "util.net.Nio2Endpoint$Nio2SocketWrapper$VectoredIOCompletionHandler");
+        loader.loadClass(basePackage + "util.net.AprEndpoint$AprSocketWrapper$AprOperationState");
+        loader.loadClass(basePackage + "util.net.NioEndpoint$NioSocketWrapper$NioOperationState");
+        loader.loadClass(basePackage + "util.net.Nio2Endpoint$Nio2SocketWrapper$Nio2OperationState");
         loader.loadClass(basePackage + "util.net.SocketWrapperBase$BlockingMode");
         loader.loadClass(basePackage + "util.net.SocketWrapperBase$CompletionCheck");
         loader.loadClass(basePackage + "util.net.SocketWrapperBase$CompletionHandlerCall");
         loader.loadClass(basePackage + "util.net.SocketWrapperBase$CompletionState");
+        loader.loadClass(basePackage + "util.net.SocketWrapperBase$VectoredIOCompletionHandler");
         // security
         loader.loadClass(basePackage + "util.security.PrivilegedGetTccl");
         loader.loadClass(basePackage + "util.security.PrivilegedSetTccl");
diff --git a/java/org/apache/tomcat/util/net/AprEndpoint.java b/java/org/apache/tomcat/util/net/AprEndpoint.java
index ad8e16d..d5898fe 100644
--- a/java/org/apache/tomcat/util/net/AprEndpoint.java
+++ b/java/org/apache/tomcat/util/net/AprEndpoint.java
@@ -22,9 +22,6 @@ import java.net.InetSocketAddress;
 import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
 import java.nio.channels.CompletionHandler;
-import java.nio.channels.InterruptedByTimeoutException;
-import java.nio.channels.ReadPendingException;
-import java.nio.channels.WritePendingException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -2159,22 +2156,9 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
         // This field should only be used by Poller#run()
         private int pollerFlags = 0;
 
-        private final Semaphore readPending;
-        private OperationState<?> readOperation = null;
-        private final Semaphore writePending;
-        private OperationState<?> writeOperation = null;
-
         public AprSocketWrapper(Long socket, AprEndpoint endpoint) {
             super(socket, endpoint);
 
-            if (endpoint.getUseAsyncIO()) {
-                readPending = new Semaphore(1);
-                writePending = new Semaphore(1);
-            } else {
-                readPending = null;
-                writePending = null;
-            }
-
             // TODO Make the socketWriteBuffer size configurable and align the
             //      SSL and app buffer size settings with NIO & NIO2.
             if (endpoint.isSSLEnabled()) {
@@ -2779,59 +2763,32 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
         }
 
         @Override
-        public boolean hasAsyncIO() {
-            // The semaphores are only created if async IO is enabled
-            return (readPending != null);
+        protected <A> OperationState<A> newOperationState(boolean read,
+                ByteBuffer[] buffers, int offset, int length,
+                BlockingMode block, long timeout, TimeUnit unit, A attachment,
+                CompletionCheck check, CompletionHandler<Long, ? super A> handler,
+                Semaphore semaphore, VectoredIOCompletionHandler<A> completion) {
+            return new AprOperationState<A>(read, buffers, offset, length, block,
+                    timeout, unit, attachment, check, handler, semaphore, completion);
         }
 
-        /**
-         * Internal state tracker for scatter/gather operations.
-         */
-        private class OperationState<A> implements Runnable {
-            private final boolean read;
-            private final ByteBuffer[] buffers;
-            private final int offset;
-            private final int length;
-            private final A attachment;
-            private final BlockingMode block;
-            private final CompletionCheck check;
-            private final CompletionHandler<Long, ? super A> handler;
-            private final Semaphore semaphore;
-            private final VectoredIOCompletionHandler<A> completion;
-            private OperationState(boolean read, ByteBuffer[] buffers, int offset, int length,
-                    BlockingMode block, A attachment, CompletionCheck check,
+        private class AprOperationState<A>  extends OperationState<A> {
+            private AprOperationState(boolean read, ByteBuffer[] buffers, int offset, int length,
+                    BlockingMode block, long timeout, TimeUnit unit, A attachment, CompletionCheck check,
                     CompletionHandler<Long, ? super A> handler, Semaphore semaphore,
                     VectoredIOCompletionHandler<A> completion) {
-                this.read = read;
-                this.buffers = buffers;
-                this.offset = offset;
-                this.length = length;
-                this.block = block;
-                this.attachment = attachment;
-                this.check = check;
-                this.handler = handler;
-                this.semaphore = semaphore;
-                this.completion = completion;
-            }
-            private volatile boolean inline = true;
-            private volatile long nBytes = 0;
-            private volatile CompletionState state = CompletionState.PENDING;
-            private boolean completionDone = true;
-
-            public boolean process() {
-                try {
-                    getEndpoint().getExecutor().execute(this);
-                } catch (RejectedExecutionException ree) {
-                    log.warn(sm.getString("endpoint.executor.fail", AprSocketWrapper.this) , ree);
-                    return false;
-                } catch (Throwable t) {
-                    ExceptionUtils.handleThrowable(t);
-                    // This means we got an OOM or similar creating a thread, or that
-                    // the pool and its queue are full
-                    log.error(sm.getString("endpoint.process.fail"), t);
-                    return false;
-                }
-                return true;
+                super(read, buffers, offset, length, block,
+                        timeout, unit, attachment, check, handler, semaphore, completion);
+            }
+
+            @Override
+            protected boolean isInline() {
+                return inline;
+            }
+
+            @Override
+            protected void start() {
+                run();
             }
 
             @Override
@@ -2914,168 +2871,5 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
 
         }
 
-        @Override
-        public <A> CompletionState read(ByteBuffer[] dsts, int offset, int length,
-                BlockingMode block, long timeout, TimeUnit unit, A attachment,
-                CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
-            return readOrWrite(true, dsts, offset, length, block, timeout, unit, attachment, check, handler);
-        }
-
-        @Override
-        public <A> CompletionState write(ByteBuffer[] srcs, int offset, int length,
-                BlockingMode block, long timeout, TimeUnit unit, A attachment,
-                CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
-            return readOrWrite(false, srcs, offset, length, block, timeout, unit, attachment, check, handler);
-        }
-
-        private <A> CompletionState readOrWrite(boolean read,
-                ByteBuffer[] buffers, int offset, int length,
-                BlockingMode block, long timeout, TimeUnit unit, A attachment,
-                CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
-            IOException ioe = getError();
-            if (ioe != null) {
-                handler.failed(ioe, attachment);
-                return CompletionState.ERROR;
-            }
-            if (timeout == -1) {
-                timeout = toTimeout(read ? getReadTimeout() : getWriteTimeout());
-            } else if (unit.toMillis(timeout) != (read ? getReadTimeout() : getWriteTimeout())) {
-                if (read) {
-                    setReadTimeout(unit.toMillis(timeout));
-                } else {
-                    setWriteTimeout(unit.toMillis(timeout));
-                }
-            }
-            if (block == BlockingMode.BLOCK || block == BlockingMode.SEMI_BLOCK) {
-                try {
-                    if (read ? !readPending.tryAcquire(timeout, unit) : !writePending.tryAcquire(timeout, unit)) {
-                        handler.failed(new SocketTimeoutException(), attachment);
-                        return CompletionState.ERROR;
-                    }
-                } catch (InterruptedException e) {
-                    handler.failed(e, attachment);
-                    return CompletionState.ERROR;
-                }
-            } else {
-                if (read ? !readPending.tryAcquire() : !writePending.tryAcquire()) {
-                    if (block == BlockingMode.NON_BLOCK) {
-                        return CompletionState.NOT_DONE;
-                    } else {
-                        handler.failed(read ? new ReadPendingException() : new WritePendingException(), attachment);
-                        return CompletionState.ERROR;
-                    }
-                }
-            }
-            VectoredIOCompletionHandler<A> completion = new VectoredIOCompletionHandler<>();
-            OperationState<A> state = new OperationState<>(read, buffers, offset, length, block,
-                    attachment, check, handler, read ? readPending : writePending, completion);
-            if (read) {
-                readOperation = state;
-            } else {
-                writeOperation = state;
-            }
-            state.run();
-            if (block == BlockingMode.BLOCK) {
-                synchronized (state) {
-                    if (state.state == CompletionState.PENDING) {
-                        try {
-                            state.wait(unit.toMillis(timeout));
-                            if (state.state == CompletionState.PENDING) {
-                                return CompletionState.ERROR;
-                            }
-                        } catch (InterruptedException e) {
-                            handler.failed(new SocketTimeoutException(), attachment);
-                            return CompletionState.ERROR;
-                        }
-                    }
-                }
-            }
-            return state.state;
-        }
-
-        private class VectoredIOCompletionHandler<A> implements CompletionHandler<Long, OperationState<A>> {
-            @Override
-            public void completed(Long nBytes, OperationState<A> state) {
-                if (nBytes.longValue() < 0) {
-                    failed(new EOFException(), state);
-                } else {
-                    state.nBytes += nBytes.longValue();
-                    CompletionState currentState = state.inline ? CompletionState.INLINE : CompletionState.DONE;
-                    boolean complete = true;
-                    boolean completion = true;
-                    if (state.check != null) {
-                        CompletionHandlerCall call = state.check.callHandler(currentState, state.buffers, state.offset, state.length);
-                        if (call == CompletionHandlerCall.CONTINUE) {
-                            complete = false;
-                        } else if (call == CompletionHandlerCall.NONE) {
-                            completion = false;
-                        }
-                    }
-                    if (complete) {
-                        boolean notify = false;
-                        state.semaphore.release();
-                        if (state.read) {
-                            readOperation = null;
-                        } else {
-                            writeOperation = null;
-                        }
-                        if (state.block == BlockingMode.BLOCK && currentState != CompletionState.INLINE) {
-                            notify = true;
-                        } else {
-                            state.state = currentState;
-                        }
-                        if (completion && state.handler != null) {
-                            state.handler.completed(Long.valueOf(state.nBytes), state.attachment);
-                        }
-                        synchronized (state) {
-                            state.completionDone = true;
-                            if (notify) {
-                                state.state = currentState;
-                                state.notify();
-                            }
-                        }
-                    } else {
-                        synchronized (state) {
-                            state.completionDone = true;
-                        }
-                        state.run();
-                    }
-                }
-            }
-            @Override
-            public void failed(Throwable exc, OperationState<A> state) {
-                IOException ioe = null;
-                if (exc instanceof InterruptedByTimeoutException) {
-                    ioe = new SocketTimeoutException();
-                    exc = ioe;
-                } else if (exc instanceof IOException) {
-                    ioe = (IOException) exc;
-                }
-                setError(ioe);
-                boolean notify = false;
-                state.semaphore.release();
-                if (state.read) {
-                    readOperation = null;
-                } else {
-                    writeOperation = null;
-                }
-                if (state.block == BlockingMode.BLOCK) {
-                    notify = true;
-                } else {
-                    state.state = state.inline ? CompletionState.ERROR : CompletionState.DONE;
-                }
-                if (state.handler != null) {
-                    state.handler.failed(exc, state.attachment);
-                }
-                synchronized (state) {
-                    state.completionDone = true;
-                    if (notify) {
-                        state.state = state.inline ? CompletionState.ERROR : CompletionState.DONE;
-                        state.notify();
-                    }
-                }
-            }
-        }
-
     }
 }
diff --git a/java/org/apache/tomcat/util/net/Nio2Endpoint.java b/java/org/apache/tomcat/util/net/Nio2Endpoint.java
index 8f2de8d..3c166b9 100644
--- a/java/org/apache/tomcat/util/net/Nio2Endpoint.java
+++ b/java/org/apache/tomcat/util/net/Nio2Endpoint.java
@@ -29,15 +29,11 @@ import java.nio.channels.AsynchronousServerSocketChannel;
 import java.nio.channels.AsynchronousSocketChannel;
 import java.nio.channels.CompletionHandler;
 import java.nio.channels.FileChannel;
-import java.nio.channels.InterruptedByTimeoutException;
 import java.nio.channels.NetworkChannel;
-import java.nio.channels.ReadPendingException;
-import java.nio.channels.WritePendingException;
 import java.nio.file.StandardOpenOption;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -466,13 +462,11 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS
         private SendfileData sendfileData = null;
 
         private final CompletionHandler<Integer, ByteBuffer> readCompletionHandler;
-        private final Semaphore readPending = new Semaphore(1);
         private boolean readInterest = false; // Guarded by readCompletionHandler
         private boolean readNotify = false;
 
         private final CompletionHandler<Integer, ByteBuffer> writeCompletionHandler;
         private final CompletionHandler<Long, ByteBuffer[]> gatheringWriteCompletionHandler;
-        private final Semaphore writePending = new Semaphore(1);
         private boolean writeInterest = false; // Guarded by writeCompletionHandler
         private boolean writeNotify = false;
         private volatile boolean closed = false;
@@ -951,63 +945,42 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS
             return getEndpoint().getUseAsyncIO();
         }
 
-        /**
-         * Internal state tracker for scatter/gather operations.
-         */
-        protected class OperationState<A> implements Runnable {
-            protected final boolean read;
-            protected final ByteBuffer[] buffers;
-            protected final int offset;
-            protected final int length;
-            protected final A attachment;
-            protected final long timeout;
-            protected final TimeUnit unit;
-            protected final BlockingMode block;
-            protected final CompletionCheck check;
-            protected final CompletionHandler<Long, ? super A> handler;
-            protected final Semaphore semaphore;
-            protected final VectoredIOCompletionHandler<A> completion;
-            protected OperationState(boolean read, ByteBuffer[] buffers, int offset, int length,
+        @Override
+        public boolean needSemaphores() {
+            return true;
+        }
+
+        @Override
+        public boolean hasPerOperationTimeout() {
+            return false;
+        }
+
+        @Override
+        protected <A> OperationState<A> newOperationState(boolean read,
+                ByteBuffer[] buffers, int offset, int length,
+                BlockingMode block, long timeout, TimeUnit unit, A attachment,
+                CompletionCheck check, CompletionHandler<Long, ? super A> handler,
+                Semaphore semaphore, VectoredIOCompletionHandler<A> completion) {
+            return new Nio2OperationState<A>(read, buffers, offset, length, block,
+                    timeout, unit, attachment, check, handler, semaphore, completion);
+        }
+
+        private class Nio2OperationState<A> extends OperationState<A> {
+            private Nio2OperationState(boolean read, ByteBuffer[] buffers, int offset, int length,
                     BlockingMode block, long timeout, TimeUnit unit, A attachment,
                     CompletionCheck check, CompletionHandler<Long, ? super A> handler,
                     Semaphore semaphore, VectoredIOCompletionHandler<A> completion) {
-                this.read = read;
-                this.buffers = buffers;
-                this.offset = offset;
-                this.length = length;
-                this.block = block;
-                this.timeout = timeout;
-                this.unit = unit;
-                this.attachment = attachment;
-                this.check = check;
-                this.handler = handler;
-                this.semaphore = semaphore;
-                this.completion = completion;
-            }
-            protected volatile long nBytes = 0;
-            protected volatile CompletionState state = CompletionState.PENDING;
-
-            public boolean isInline() {
-                return Nio2Endpoint.isInline();
+                super(read, buffers, offset, length, block,
+                    timeout, unit, attachment, check, handler, semaphore, completion);
             }
 
-            public boolean process() {
-                try {
-                    getEndpoint().getExecutor().execute(this);
-                } catch (RejectedExecutionException ree) {
-                    log.warn(sm.getString("endpoint.executor.fail", Nio2SocketWrapper.this) , ree);
-                    return false;
-                } catch (Throwable t) {
-                    ExceptionUtils.handleThrowable(t);
-                    // This means we got an OOM or similar creating a thread, or that
-                    // the pool and its queue are full
-                    log.error(sm.getString("endpoint.process.fail"), t);
-                    return false;
-                }
-                return true;
+            @Override
+            protected boolean isInline() {
+                return Nio2Endpoint.isInline();
             }
 
-            public void start() {
+            @Override
+            protected void start() {
                 if (read) {
                     // Disable any regular read notifications caused by registerReadInterest
                     readNotify = true;
@@ -1051,7 +1024,7 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS
                             }
                             @Override
                             public void failed(Throwable exc, Void attachment) {
-                                handler.failed(exc, OperationState.this.attachment);
+                                handler.failed(exc, Nio2OperationState.this.attachment);
                             }
                         });
                         return;
@@ -1062,143 +1035,6 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS
             }
         }
 
-        @Override
-        public <A> CompletionState read(ByteBuffer[] dsts, int offset, int length,
-                BlockingMode block, long timeout, TimeUnit unit, A attachment,
-                CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
-            return readOrWrite(true, dsts, offset, length, block, timeout, unit, attachment, check, handler);
-        }
-
-        @Override
-        public <A> CompletionState write(ByteBuffer[] srcs, int offset, int length,
-                BlockingMode block, long timeout, TimeUnit unit, A attachment,
-                CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
-            return readOrWrite(false, srcs, offset, length, block, timeout, unit, attachment, check, handler);
-        }
-
-        private <A> CompletionState readOrWrite(boolean read,
-                ByteBuffer[] buffers, int offset, int length,
-                BlockingMode block, long timeout, TimeUnit unit, A attachment,
-                CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
-            IOException ioe = getError();
-            if (ioe != null) {
-                handler.failed(ioe, attachment);
-                return CompletionState.ERROR;
-            }
-            if (timeout == -1) {
-                timeout = toTimeout(read ? getReadTimeout() : getWriteTimeout());
-            }
-            if (block == BlockingMode.BLOCK || block == BlockingMode.SEMI_BLOCK) {
-                try {
-                    if (read ? !readPending.tryAcquire(timeout, unit) : !writePending.tryAcquire(timeout, unit)) {
-                        handler.failed(new SocketTimeoutException(), attachment);
-                        return CompletionState.ERROR;
-                    }
-                } catch (InterruptedException e) {
-                    handler.failed(e, attachment);
-                    return CompletionState.ERROR;
-                }
-            } else {
-                if (read ? !readPending.tryAcquire() : !writePending.tryAcquire()) {
-                    if (block == BlockingMode.NON_BLOCK) {
-                        return CompletionState.NOT_DONE;
-                    } else {
-                        handler.failed(read ? new ReadPendingException() : new WritePendingException(), attachment);
-                        return CompletionState.ERROR;
-                    }
-                }
-            }
-            VectoredIOCompletionHandler<A> completion = new VectoredIOCompletionHandler<>();
-            OperationState<A> state = new OperationState<>(read, buffers, offset, length, block, timeout, unit,
-                    attachment, check, handler, read ? readPending : writePending, completion);
-            state.start();
-            if (block == BlockingMode.BLOCK) {
-                synchronized (state) {
-                    if (state.state == CompletionState.PENDING) {
-                        try {
-                            state.wait(unit.toMillis(timeout));
-                            if (state.state == CompletionState.PENDING) {
-                                return CompletionState.ERROR;
-                            }
-                        } catch (InterruptedException e) {
-                            handler.failed(new SocketTimeoutException(), attachment);
-                            return CompletionState.ERROR;
-                        }
-                    }
-                }
-            }
-            return state.state;
-        }
-
-        private class VectoredIOCompletionHandler<A> implements CompletionHandler<Long, OperationState<A>> {
-            @Override
-            public void completed(Long nBytes, OperationState<A> state) {
-                if (nBytes.longValue() < 0) {
-                    failed(new EOFException(), state);
-                } else {
-                    state.nBytes += nBytes.longValue();
-                    CompletionState currentState = state.isInline() ? CompletionState.INLINE : CompletionState.DONE;
-                    boolean complete = true;
-                    boolean completion = true;
-                    if (state.check != null) {
-                        CompletionHandlerCall call = state.check.callHandler(currentState, state.buffers, state.offset, state.length);
-                        if (call == CompletionHandlerCall.CONTINUE) {
-                            complete = false;
-                        } else if (call == CompletionHandlerCall.NONE) {
-                            completion = false;
-                        }
-                    }
-                    if (complete) {
-                        boolean notify = false;
-                        state.semaphore.release();
-                        if (state.block == BlockingMode.BLOCK && currentState != CompletionState.INLINE) {
-                            notify = true;
-                        } else {
-                            state.state = currentState;
-                        }
-                        if (completion && state.handler != null) {
-                            state.handler.completed(Long.valueOf(state.nBytes), state.attachment);
-                        }
-                        if (notify) {
-                            synchronized (state) {
-                                state.state = currentState;
-                                state.notify();
-                            }
-                        }
-                    } else {
-                        state.run();
-                    }
-                }
-            }
-            @Override
-            public void failed(Throwable exc, OperationState<A> state) {
-                IOException ioe = null;
-                if (exc instanceof InterruptedByTimeoutException) {
-                    ioe = new SocketTimeoutException();
-                    exc = ioe;
-                } else if (exc instanceof IOException) {
-                    ioe = (IOException) exc;
-                }
-                setError(ioe);
-                boolean notify = false;
-                state.semaphore.release();
-                if (state.block == BlockingMode.BLOCK) {
-                    notify = true;
-                } else {
-                    state.state = state.isInline() ? CompletionState.ERROR : CompletionState.DONE;
-                }
-                if (state.handler != null) {
-                    state.handler.failed(exc, state.attachment);
-                }
-                if (notify) {
-                    synchronized (state) {
-                        state.state = state.isInline() ? CompletionState.ERROR : CompletionState.DONE;
-                        state.notify();
-                    }
-                }
-            }
-        }
-
         /* Callers of this method must:
          * - have acquired the readPending semaphore
          * - have acquired a lock on readCompletionHandler
diff --git a/java/org/apache/tomcat/util/net/NioEndpoint.java b/java/org/apache/tomcat/util/net/NioEndpoint.java
index a6f0f62..bdebc73 100644
--- a/java/org/apache/tomcat/util/net/NioEndpoint.java
+++ b/java/org/apache/tomcat/util/net/NioEndpoint.java
@@ -29,19 +29,15 @@ import java.nio.channels.CancelledKeyException;
 import java.nio.channels.Channel;
 import java.nio.channels.CompletionHandler;
 import java.nio.channels.FileChannel;
-import java.nio.channels.InterruptedByTimeoutException;
 import java.nio.channels.NetworkChannel;
-import java.nio.channels.ReadPendingException;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.nio.channels.WritableByteChannel;
-import java.nio.channels.WritePendingException;
 import java.util.ConcurrentModificationException;
 import java.util.Iterator;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -1045,10 +1041,6 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
         private int interestOps = 0;
         private CountDownLatch readLatch = null;
         private CountDownLatch writeLatch = null;
-        private final Semaphore readPending;
-        private OperationState<?> readOperation = null;
-        private final Semaphore writePending;
-        private OperationState<?> writeOperation = null;
         private volatile SendfileData sendfileData = null;
         private volatile long lastRead = System.currentTimeMillis();
         private volatile long lastWrite = lastRead;
@@ -1056,13 +1048,6 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
 
         public NioSocketWrapper(NioChannel channel, NioEndpoint endpoint) {
             super(channel, endpoint);
-            if (endpoint.getUseAsyncIO()) {
-                readPending = new Semaphore(1);
-                writePending = new Semaphore(1);
-            } else {
-                readPending = null;
-                writePending = null;
-            }
             pool = endpoint.getSelectorPool();
             socketBufferHandler = channel.getBufHandler();
         }
@@ -1431,59 +1416,32 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
         }
 
         @Override
-        public boolean hasAsyncIO() {
-            // The semaphores are only created if async IO is enabled
-            return (readPending != null);
+        protected <A> OperationState<A> newOperationState(boolean read,
+                ByteBuffer[] buffers, int offset, int length,
+                BlockingMode block, long timeout, TimeUnit unit, A attachment,
+                CompletionCheck check, CompletionHandler<Long, ? super A> handler,
+                Semaphore semaphore, VectoredIOCompletionHandler<A> completion) {
+            return new NioOperationState<A>(read, buffers, offset, length, block,
+                    timeout, unit, attachment, check, handler, semaphore, completion);
         }
 
-        /**
-         * Internal state tracker for scatter/gather operations.
-         */
-        private class OperationState<A> implements Runnable {
-            private final boolean read;
-            private final ByteBuffer[] buffers;
-            private final int offset;
-            private final int length;
-            private final A attachment;
-            private final BlockingMode block;
-            private final CompletionCheck check;
-            private final CompletionHandler<Long, ? super A> handler;
-            private final Semaphore semaphore;
-            private final VectoredIOCompletionHandler<A> completion;
-            private OperationState(boolean read, ByteBuffer[] buffers, int offset, int length,
-                    BlockingMode block, A attachment, CompletionCheck check,
+        private class NioOperationState<A> extends OperationState<A> {
+            private NioOperationState(boolean read, ByteBuffer[] buffers, int offset, int length,
+                    BlockingMode block, long timeout, TimeUnit unit, A attachment, CompletionCheck check,
                     CompletionHandler<Long, ? super A> handler, Semaphore semaphore,
                     VectoredIOCompletionHandler<A> completion) {
-                this.read = read;
-                this.buffers = buffers;
-                this.offset = offset;
-                this.length = length;
-                this.block = block;
-                this.attachment = attachment;
-                this.check = check;
-                this.handler = handler;
-                this.semaphore = semaphore;
-                this.completion = completion;
+                super(read, buffers, offset, length, block,
+                        timeout, unit, attachment, check, handler, semaphore, completion);
             }
-            private volatile boolean inline = true;
-            private volatile long nBytes = 0;
-            private volatile CompletionState state = CompletionState.PENDING;
-            private boolean completionDone = true;
 
-            public boolean process() {
-                try {
-                    getEndpoint().getExecutor().execute(this);
-                } catch (RejectedExecutionException ree) {
-                    log.warn(sm.getString("endpoint.executor.fail", NioSocketWrapper.this) , ree);
-                    return false;
-                } catch (Throwable t) {
-                    ExceptionUtils.handleThrowable(t);
-                    // This means we got an OOM or similar creating a thread, or that
-                    // the pool and its queue are full
-                    log.error(sm.getString("endpoint.process.fail"), t);
-                    return false;
-                }
-                return true;
+            @Override
+            protected boolean isInline() {
+                return inline;
+            }
+
+            @Override
+            protected void start() {
+                run();
             }
 
             @Override
@@ -1560,169 +1518,6 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
 
         }
 
-        @Override
-        public <A> CompletionState read(ByteBuffer[] dsts, int offset, int length,
-                BlockingMode block, long timeout, TimeUnit unit, A attachment,
-                CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
-            return readOrWrite(true, dsts, offset, length, block, timeout, unit, attachment, check, handler);
-        }
-
-        @Override
-        public <A> CompletionState write(ByteBuffer[] srcs, int offset, int length,
-                BlockingMode block, long timeout, TimeUnit unit, A attachment,
-                CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
-            return readOrWrite(false, srcs, offset, length, block, timeout, unit, attachment, check, handler);
-        }
-
-        private <A> CompletionState readOrWrite(boolean read,
-                ByteBuffer[] buffers, int offset, int length,
-                BlockingMode block, long timeout, TimeUnit unit, A attachment,
-                CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
-            IOException ioe = getError();
-            if (ioe != null) {
-                handler.failed(ioe, attachment);
-                return CompletionState.ERROR;
-            }
-            if (timeout == -1) {
-                timeout = toTimeout(read ? getReadTimeout() : getWriteTimeout());
-            } else if (unit.toMillis(timeout) != (read ? getReadTimeout() : getWriteTimeout())) {
-                if (read) {
-                    setReadTimeout(unit.toMillis(timeout));
-                } else {
-                    setWriteTimeout(unit.toMillis(timeout));
-                }
-            }
-            if (block == BlockingMode.BLOCK || block == BlockingMode.SEMI_BLOCK) {
-                try {
-                    if (read ? !readPending.tryAcquire(timeout, unit) : !writePending.tryAcquire(timeout, unit)) {
-                        handler.failed(new SocketTimeoutException(), attachment);
-                        return CompletionState.ERROR;
-                    }
-                } catch (InterruptedException e) {
-                    handler.failed(e, attachment);
-                    return CompletionState.ERROR;
-                }
-            } else {
-                if (read ? !readPending.tryAcquire() : !writePending.tryAcquire()) {
-                    if (block == BlockingMode.NON_BLOCK) {
-                        return CompletionState.NOT_DONE;
-                    } else {
-                        handler.failed(read ? new ReadPendingException() : new WritePendingException(), attachment);
-                        return CompletionState.ERROR;
-                    }
-                }
-            }
-            VectoredIOCompletionHandler<A> completion = new VectoredIOCompletionHandler<>();
-            OperationState<A> state = new OperationState<>(read, buffers, offset, length, block,
-                    attachment, check, handler, read ? readPending : writePending, completion);
-            if (read) {
-                readOperation = state;
-            } else {
-                writeOperation = state;
-            }
-            state.run();
-            if (block == BlockingMode.BLOCK) {
-                synchronized (state) {
-                    if (state.state == CompletionState.PENDING) {
-                        try {
-                            state.wait(unit.toMillis(timeout));
-                            if (state.state == CompletionState.PENDING) {
-                                return CompletionState.ERROR;
-                            }
-                        } catch (InterruptedException e) {
-                            handler.failed(new SocketTimeoutException(), attachment);
-                            return CompletionState.ERROR;
-                        }
-                    }
-                }
-            }
-            return state.state;
-        }
-
-        private class VectoredIOCompletionHandler<A> implements CompletionHandler<Long, OperationState<A>> {
-            @Override
-            public void completed(Long nBytes, OperationState<A> state) {
-                if (nBytes.longValue() < 0) {
-                    failed(new EOFException(), state);
-                } else {
-                    state.nBytes += nBytes.longValue();
-                    CompletionState currentState = state.inline ? CompletionState.INLINE : CompletionState.DONE;
-                    boolean complete = true;
-                    boolean completion = true;
-                    if (state.check != null) {
-                        CompletionHandlerCall call = state.check.callHandler(currentState, state.buffers, state.offset, state.length);
-                        if (call == CompletionHandlerCall.CONTINUE) {
-                            complete = false;
-                        } else if (call == CompletionHandlerCall.NONE) {
-                            completion = false;
-                        }
-                    }
-                    if (complete) {
-                        boolean notify = false;
-                        state.semaphore.release();
-                        if (state.read) {
-                            readOperation = null;
-                        } else {
-                            writeOperation = null;
-                        }
-                        if (state.block == BlockingMode.BLOCK && currentState != CompletionState.INLINE) {
-                            notify = true;
-                        } else {
-                            state.state = currentState;
-                        }
-                        if (completion && state.handler != null) {
-                            state.handler.completed(Long.valueOf(state.nBytes), state.attachment);
-                        }
-                        synchronized (state) {
-                            state.completionDone = true;
-                            if (notify) {
-                                state.state = currentState;
-                                state.notify();
-                            }
-                        }
-                    } else {
-                        synchronized (state) {
-                            state.completionDone = true;
-                        }
-                        state.run();
-                    }
-                }
-            }
-            @Override
-            public void failed(Throwable exc, OperationState<A> state) {
-                IOException ioe = null;
-                if (exc instanceof InterruptedByTimeoutException) {
-                    ioe = new SocketTimeoutException();
-                    exc = ioe;
-                } else if (exc instanceof IOException) {
-                    ioe = (IOException) exc;
-                }
-                setError(ioe);
-                boolean notify = false;
-                state.semaphore.release();
-                if (state.read) {
-                    readOperation = null;
-                } else {
-                    writeOperation = null;
-                }
-                if (state.block == BlockingMode.BLOCK) {
-                    notify = true;
-                } else {
-                    state.state = state.inline ? CompletionState.ERROR : CompletionState.DONE;
-                }
-                if (state.handler != null) {
-                    state.handler.failed(exc, state.attachment);
-                }
-                synchronized (state) {
-                    state.completionDone = true;
-                    if (notify) {
-                        state.state = state.inline ? CompletionState.ERROR : CompletionState.DONE;
-                        state.notify();
-                    }
-                }
-            }
-        }
-
     }
 
 
diff --git a/java/org/apache/tomcat/util/net/SocketWrapperBase.java b/java/org/apache/tomcat/util/net/SocketWrapperBase.java
index b09284a..60e383d 100644
--- a/java/org/apache/tomcat/util/net/SocketWrapperBase.java
+++ b/java/org/apache/tomcat/util/net/SocketWrapperBase.java
@@ -16,11 +16,17 @@
  */
 package org.apache.tomcat.util.net;
 
+import java.io.EOFException;
 import java.io.IOException;
+import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
 import java.nio.channels.CompletionHandler;
+import java.nio.channels.InterruptedByTimeoutException;
+import java.nio.channels.ReadPendingException;
+import java.nio.channels.WritePendingException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -28,6 +34,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
 import org.apache.juli.logging.Log;
 import org.apache.juli.logging.LogFactory;
+import org.apache.tomcat.util.ExceptionUtils;
 import org.apache.tomcat.util.res.StringManager;
 
 public abstract class SocketWrapperBase<E> {
@@ -93,12 +100,24 @@ public abstract class SocketWrapperBase<E> {
      */
     protected final WriteBuffer nonBlockingWriteBuffer = new WriteBuffer(bufferedWriteSize);
 
+    protected final Semaphore readPending;
+    protected OperationState<?> readOperation = null;
+    protected final Semaphore writePending;
+    protected OperationState<?> writeOperation = null;
+
     public SocketWrapperBase(E socket, AbstractEndpoint<E,?> endpoint) {
         this.socket = socket;
         this.endpoint = endpoint;
         ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
         this.blockingStatusReadLock = lock.readLock();
         this.blockingStatusWriteLock = lock.writeLock();
+        if (endpoint.getUseAsyncIO() || needSemaphores()) {
+            readPending = new Semaphore(1);
+            writePending = new Semaphore(1);
+        } else {
+            readPending = null;
+            writePending = null;
+        }
     }
 
     public E getSocket() {
@@ -952,12 +971,191 @@ public abstract class SocketWrapperBase<E> {
     public static final CompletionCheck COMPLETE_READ = COMPLETE_WRITE;
 
     /**
+     * Internal state tracker for vectored operations.
+     */
+    protected abstract class OperationState<A> implements Runnable {
+        protected final boolean read;
+        protected final ByteBuffer[] buffers;
+        protected final int offset;
+        protected final int length;
+        protected final A attachment;
+        protected final long timeout;
+        protected final TimeUnit unit;
+        protected final BlockingMode block;
+        protected final CompletionCheck check;
+        protected final CompletionHandler<Long, ? super A> handler;
+        protected final Semaphore semaphore;
+        protected final VectoredIOCompletionHandler<A> completion;
+        protected OperationState(boolean read, ByteBuffer[] buffers, int offset, int length,
+                BlockingMode block, long timeout, TimeUnit unit, A attachment,
+                CompletionCheck check, CompletionHandler<Long, ? super A> handler,
+                Semaphore semaphore, VectoredIOCompletionHandler<A> completion) {
+            this.read = read;
+            this.buffers = buffers;
+            this.offset = offset;
+            this.length = length;
+            this.block = block;
+            this.timeout = timeout;
+            this.unit = unit;
+            this.attachment = attachment;
+            this.check = check;
+            this.handler = handler;
+            this.semaphore = semaphore;
+            this.completion = completion;
+        }
+        protected volatile long nBytes = 0;
+        protected volatile CompletionState state = CompletionState.PENDING;
+        protected volatile boolean inline = true;
+        protected boolean completionDone = true;
+
+        /**
+         * @return true if the operation is still inline, false if the operation
+         *   is running on a thread that is not the original caller
+         */
+        protected abstract boolean isInline();
+
+        /**
+         * Process the operation using the connector executor.
+         * @return true if the operation was accepted, false if the executor
+         *     rejected execurtion
+         */
+        protected boolean process() {
+            try {
+                getEndpoint().getExecutor().execute(this);
+            } catch (RejectedExecutionException ree) {
+                log.warn(sm.getString("endpoint.executor.fail", SocketWrapperBase.this) , ree);
+                return false;
+            } catch (Throwable t) {
+                ExceptionUtils.handleThrowable(t);
+                // This means we got an OOM or similar creating a thread, or that
+                // the pool and its queue are full
+                log.error(sm.getString("endpoint.process.fail"), t);
+                return false;
+            }
+            return true;
+        }
+
+        /**
+         * Start the operation, this will typically call run.
+         */
+        protected abstract void start();
+
+    }
+
+    /**
+     * Completion handler for vectored operations. This will check the completion of the operation,
+     * then either continue or call the user provided completion handler.
+     */
+    protected class VectoredIOCompletionHandler<A> implements CompletionHandler<Long, OperationState<A>> {
+        @Override
+        public void completed(Long nBytes, OperationState<A> state) {
+            if (nBytes.longValue() < 0) {
+                failed(new EOFException(), state);
+            } else {
+                state.nBytes += nBytes.longValue();
+                CompletionState currentState = state.isInline() ? CompletionState.INLINE : CompletionState.DONE;
+                boolean complete = true;
+                boolean completion = true;
+                if (state.check != null) {
+                    CompletionHandlerCall call = state.check.callHandler(currentState, state.buffers, state.offset, state.length);
+                    if (call == CompletionHandlerCall.CONTINUE) {
+                        complete = false;
+                    } else if (call == CompletionHandlerCall.NONE) {
+                        completion = false;
+                    }
+                }
+                if (complete) {
+                    boolean notify = false;
+                    state.semaphore.release();
+                    if (state.read) {
+                        readOperation = null;
+                    } else {
+                        writeOperation = null;
+                    }
+                    if (state.block == BlockingMode.BLOCK && currentState != CompletionState.INLINE) {
+                        notify = true;
+                    } else {
+                        state.state = currentState;
+                    }
+                    if (completion && state.handler != null) {
+                        state.handler.completed(Long.valueOf(state.nBytes), state.attachment);
+                    }
+                    synchronized (state) {
+                        state.completionDone = true;
+                        if (notify) {
+                            state.state = currentState;
+                            state.notify();
+                        }
+                    }
+                } else {
+                    synchronized (state) {
+                        state.completionDone = true;
+                    }
+                    state.run();
+                }
+            }
+        }
+        @Override
+        public void failed(Throwable exc, OperationState<A> state) {
+            IOException ioe = null;
+            if (exc instanceof InterruptedByTimeoutException) {
+                ioe = new SocketTimeoutException();
+                exc = ioe;
+            } else if (exc instanceof IOException) {
+                ioe = (IOException) exc;
+            }
+            setError(ioe);
+            boolean notify = false;
+            state.semaphore.release();
+            if (state.read) {
+                readOperation = null;
+            } else {
+                writeOperation = null;
+            }
+            if (state.block == BlockingMode.BLOCK) {
+                notify = true;
+            } else {
+                state.state = state.isInline() ? CompletionState.ERROR : CompletionState.DONE;
+            }
+            if (state.handler != null) {
+                state.handler.failed(exc, state.attachment);
+            }
+            synchronized (state) {
+                state.completionDone = true;
+                if (notify) {
+                    state.state = state.isInline() ? CompletionState.ERROR : CompletionState.DONE;
+                    state.notify();
+                }
+            }
+        }
+    }
+
+    /**
      * Allows using NIO2 style read/write only for connectors that can
      * efficiently support it.
      *
      * @return This default implementation always returns {@code false}
      */
     public boolean hasAsyncIO() {
+        // The semaphores are only created if async IO is enabled
+        return (readPending != null);
+    }
+
+    /**
+     * Allows indicating if the connector needs semaphores.
+     *
+     * @return This default implementation always returns {@code false}
+     */
+    public boolean needSemaphores() {
+        return false;
+    }
+
+    /**
+     * Allows indicating if the connector supports per operation timeout.
+     *
+     * @return This default implementation always returns {@code false}
+     */
+    public boolean hasPerOperationTimeout() {
         return false;
     }
 
@@ -1086,7 +1284,7 @@ public abstract class SocketWrapperBase<E> {
     public <A> CompletionState read(ByteBuffer[] dsts, int offset, int length,
             BlockingMode block, long timeout, TimeUnit unit, A attachment,
             CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
-        throw new UnsupportedOperationException();
+        return vectoredOperation(true, dsts, offset, length, block, timeout, unit, attachment, check, handler);
     }
 
     /**
@@ -1169,12 +1367,111 @@ public abstract class SocketWrapperBase<E> {
     public <A> CompletionState write(ByteBuffer[] srcs, int offset, int length,
             BlockingMode block, long timeout, TimeUnit unit, A attachment,
             CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
-        throw new UnsupportedOperationException();
+        return vectoredOperation(false, srcs, offset, length, block, timeout, unit, attachment, check, handler);
+    }
+
+
+    /**
+     * Vectored operation. The completion handler will be called once
+     * the operation is complete or an error occurred. If a CompletionCheck
+     * object has been provided, the completion handler will only be
+     * called if the callHandler method returned true. If no
+     * CompletionCheck object has been provided, the default NIO2
+     * behavior is used: the completion handler will be called, even
+     * if the operation is incomplete, or if the operation completed inline.
+     *
+     * @param read true if the operation is a read, false if it is a write
+     * @param buffers buffers
+     * @param offset in the buffer array
+     * @param length in the buffer array
+     * @param block is the blocking mode that will be used for this operation
+     * @param timeout timeout duration for the write
+     * @param unit units for the timeout duration
+     * @param attachment an object to attach to the I/O operation that will be
+     *        used when calling the completion handler
+     * @param check for the IO operation completion
+     * @param handler to call when the IO is complete
+     * @param <A> The attachment type
+     * @return the completion state (done, done inline, or still pending)
+     */
+    protected <A> CompletionState vectoredOperation(boolean read,
+            ByteBuffer[] buffers, int offset, int length,
+            BlockingMode block, long timeout, TimeUnit unit, A attachment,
+            CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
+        IOException ioe = getError();
+        if (ioe != null) {
+            handler.failed(ioe, attachment);
+            return CompletionState.ERROR;
+        }
+        if (timeout == -1) {
+            timeout = toTimeout(read ? getReadTimeout() : getWriteTimeout());
+        } else if (!hasPerOperationTimeout() && (unit.toMillis(timeout) != (read ? getReadTimeout() : getWriteTimeout()))) {
+            if (read) {
+                setReadTimeout(unit.toMillis(timeout));
+            } else {
+                setWriteTimeout(unit.toMillis(timeout));
+            }
+        }
+        if (block == BlockingMode.BLOCK || block == BlockingMode.SEMI_BLOCK) {
+            try {
+                if (read ? !readPending.tryAcquire(timeout, unit) : !writePending.tryAcquire(timeout, unit)) {
+                    handler.failed(new SocketTimeoutException(), attachment);
+                    return CompletionState.ERROR;
+                }
+            } catch (InterruptedException e) {
+                handler.failed(e, attachment);
+                return CompletionState.ERROR;
+            }
+        } else {
+            if (read ? !readPending.tryAcquire() : !writePending.tryAcquire()) {
+                if (block == BlockingMode.NON_BLOCK) {
+                    return CompletionState.NOT_DONE;
+                } else {
+                    handler.failed(read ? new ReadPendingException() : new WritePendingException(), attachment);
+                    return CompletionState.ERROR;
+                }
+            }
+        }
+        VectoredIOCompletionHandler<A> completion = new VectoredIOCompletionHandler<>();
+        OperationState<A> state = newOperationState(read, buffers, offset, length, block, timeout, unit,
+                attachment, check, handler, read ? readPending : writePending, completion);
+        if (read) {
+            readOperation = state;
+        } else {
+            writeOperation = state;
+        }
+        state.start();
+        if (block == BlockingMode.BLOCK) {
+            synchronized (state) {
+                if (state.state == CompletionState.PENDING) {
+                    try {
+                        state.wait(unit.toMillis(timeout));
+                        if (state.state == CompletionState.PENDING) {
+                            return CompletionState.ERROR;
+                        }
+                    } catch (InterruptedException e) {
+                        handler.failed(new SocketTimeoutException(), attachment);
+                        return CompletionState.ERROR;
+                    }
+                }
+            }
+        }
+        return state.state;
     }
 
+    protected abstract <A> OperationState<A> newOperationState(boolean read,
+            ByteBuffer[] buffers, int offset, int length,
+            BlockingMode block, long timeout, TimeUnit unit, A attachment,
+            CompletionCheck check, CompletionHandler<Long, ? super A> handler,
+            Semaphore semaphore, VectoredIOCompletionHandler<A> completion);
 
     // --------------------------------------------------------- Utility methods
 
+    protected static long toTimeout(long timeout) {
+        // Many calls can't do infinite timeout so use Long.MAX_VALUE if timeout is <= 0
+        return (timeout > 0) ? timeout : Long.MAX_VALUE;
+    }
+
     protected static int transfer(byte[] from, int offset, int length, ByteBuffer to) {
         int max = Math.min(length, to.remaining());
         if (max > 0) {
diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml
index 1bbaa2f..459c293 100644
--- a/webapps/docs/changelog.xml
+++ b/webapps/docs/changelog.xml
@@ -108,6 +108,10 @@
       <fix>
         Avoid blocking write of internal buffer when using async IO. (remm)
       </fix>
+      <scode>
+        Refactor async IO implementation to the <code>SocketWrapperBase</code>.
+        (remm)
+      </scode>
     </changelog>
   </subsection>
   <subsection name="Other">


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org