You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by re...@apache.org on 2019/05/13 20:58:33 UTC

[tomcat] branch master updated: Add async IO API to APR

This is an automated email from the ASF dual-hosted git repository.

remm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tomcat.git


The following commit(s) were added to refs/heads/master by this push:
     new fdaa8c1  Add async IO API to APR
fdaa8c1 is described below

commit fdaa8c1b9ba5ccc2007e0d2d5547eba8f521b7c8
Author: remm <re...@apache.org>
AuthorDate: Mon May 13 22:58:21 2019 +0200

    Add async IO API to APR
    
    This works from what I can see with the testsuite, but will remain
    disabled by default forever. I'm adding it for consistency only, based
    on the idea it would be easy to implement using the NIO code (it was,
    mostly). The major problems with APR and the design this async API are
    lack of IO vectoring (this could probably be added), and the need to use
    direct buffers. This in turns forces extra flushes on output (maybe the
    amount of flushes can be brought down), plus use of the main buffers
    (which are direct). So the performance of APR ends up being worse than
    NIO(2), while without async it is slightly faster.
---
 java/org/apache/tomcat/util/net/AprEndpoint.java | 390 ++++++++++++++++++++++-
 webapps/docs/changelog.xml                       |   4 +
 webapps/docs/config/http.xml                     |  16 +-
 3 files changed, 385 insertions(+), 25 deletions(-)

diff --git a/java/org/apache/tomcat/util/net/AprEndpoint.java b/java/org/apache/tomcat/util/net/AprEndpoint.java
index 77c0034..bff1dda 100644
--- a/java/org/apache/tomcat/util/net/AprEndpoint.java
+++ b/java/org/apache/tomcat/util/net/AprEndpoint.java
@@ -21,6 +21,10 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
+import java.nio.channels.CompletionHandler;
+import java.nio.channels.InterruptedByTimeoutException;
+import java.nio.channels.ReadPendingException;
+import java.nio.channels.WritePendingException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -28,6 +32,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -115,6 +121,11 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
         // Need to override the default for maxConnections to align it with what
         // was pollerSize (before the two were merged)
         setMaxConnections(8 * 1024);
+        // Asynchronous IO has significantly lower performance with APR:
+        // - no IO vectoring
+        // - mandatory use of direct buffers causing required output buffering
+        // - needs extra output flushes due to the buffering
+        setUseAsyncIO(false);
     }
 
     // ------------------------------------------------------------- Properties
@@ -129,10 +140,6 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
     public boolean getDeferAccept() { return deferAccept; }
 
 
-    @Override
-    public boolean getUseAsyncIO() { return false; }
-
-
     private boolean ipv6v6only = false;
     public void setIpv6v6only(boolean ipv6v6only) { this.ipv6v6only = ipv6v6only; }
     public boolean getIpv6v6only() { return ipv6v6only; }
@@ -350,12 +357,6 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
             setUseSendfileInternal(false);
         }
 
-        // Initialize thread count default for acceptor
-        if (acceptorThreadCount == 0) {
-            // FIXME: Doesn't seem to work that well with multiple accept threads
-            acceptorThreadCount = 1;
-        }
-
         // Delay accepting of new connections until data is available
         // Only Linux kernels 2.4 + have that implemented
         // on other platforms this call is noop and will return APR_ENOTIMPL.
@@ -732,8 +733,14 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
      *         socket should be closed
      */
     protected boolean processSocket(long socket, SocketEvent event) {
-        SocketWrapperBase<Long> socketWrapper = connections.get(Long.valueOf(socket));
-        return processSocket(socketWrapper, event, true);
+        AprSocketWrapper socketWrapper = connections.get(Long.valueOf(socket));
+        if (event == SocketEvent.OPEN_READ && socketWrapper.readOperation != null) {
+            return socketWrapper.readOperation.process();
+        } else if (event == SocketEvent.OPEN_WRITE && socketWrapper.writeOperation != null) {
+            return socketWrapper.writeOperation.process();
+        } else {
+            return processSocket(socketWrapper, event, true);
+        }
     }
 
 
@@ -1322,9 +1329,17 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
                     log.debug(sm.getString("endpoint.debug.socketTimeout",
                             Long.valueOf(socket)));
                 }
-                SocketWrapperBase<Long> socketWrapper = connections.get(Long.valueOf(socket));
+                AprSocketWrapper socketWrapper = connections.get(Long.valueOf(socket));
                 socketWrapper.setError(new SocketTimeoutException());
-                processSocket(socketWrapper, SocketEvent.ERROR, true);
+                if (socketWrapper.readOperation != null || socketWrapper.writeOperation != null) {
+                    if (socketWrapper.readOperation != null) {
+                        socketWrapper.readOperation.process();
+                    } else {
+                        socketWrapper.writeOperation.process();
+                    }
+                } else {
+                    processSocket(socketWrapper, SocketEvent.ERROR, true);
+                }
                 socket = timeouts.check(date);
             }
 
@@ -1521,7 +1536,7 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
                                     // codes plus the flags it was waiting for or it may just
                                     // return an error code. We could handle the error here but
                                     // if we do, there will be no exception associated with the
-                                    // error in application code. By signalling read/write is
+                                    // error in application code. By signaling read/write is
                                     // possible, a read/write will be attempted, fail and that
                                     // will trigger an exception the application will see.
                                     // Check the return flags first, followed by what the socket
@@ -2144,10 +2159,22 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
         // This field should only be used by Poller#run()
         private int pollerFlags = 0;
 
+        private final Semaphore readPending;
+        private OperationState<?> readOperation = null;
+        private final Semaphore writePending;
+        private OperationState<?> writeOperation = null;
 
         public AprSocketWrapper(Long socket, AprEndpoint endpoint) {
             super(socket, endpoint);
 
+            if (endpoint.getUseAsyncIO()) {
+                readPending = new Semaphore(1);
+                writePending = new Semaphore(1);
+            } else {
+                readPending = null;
+                writePending = null;
+            }
+
             // TODO Make the socketWriteBuffer size configurable and align the
             //      SSL and app buffer size settings with NIO & NIO2.
             if (endpoint.isSSLEnabled()) {
@@ -2750,5 +2777,338 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
                 }
             }
         }
+
+        @Override
+        public boolean hasAsyncIO() {
+            // The semaphores are only created if async IO is enabled
+            return (readPending != null);
+        }
+
+        /**
+         * Internal state tracker for scatter/gather operations.
+         */
+        private class OperationState<A> implements Runnable {
+            private final boolean read;
+            private final ByteBuffer[] buffers;
+            private final int offset;
+            private final int length;
+            private final A attachment;
+            private final BlockingMode block;
+            private final CompletionCheck check;
+            private final CompletionHandler<Long, ? super A> handler;
+            private final Semaphore semaphore;
+            private final VectoredIOCompletionHandler<A> completion;
+            private OperationState(boolean read, ByteBuffer[] buffers, int offset, int length,
+                    BlockingMode block, A attachment, CompletionCheck check,
+                    CompletionHandler<Long, ? super A> handler, Semaphore semaphore,
+                    VectoredIOCompletionHandler<A> completion) {
+                this.read = read;
+                this.buffers = buffers;
+                this.offset = offset;
+                this.length = length;
+                this.block = block;
+                this.attachment = attachment;
+                this.check = check;
+                this.handler = handler;
+                this.semaphore = semaphore;
+                this.completion = completion;
+            }
+            private volatile boolean inline = true;
+            private volatile long nBytes = 0;
+            private volatile CompletionState state = CompletionState.PENDING;
+            private boolean completionDone = true;
+
+            public boolean process() {
+                try {
+                    getEndpoint().getExecutor().execute(this);
+                } catch (RejectedExecutionException ree) {
+                    log.warn(sm.getString("endpoint.executor.fail", AprSocketWrapper.this) , ree);
+                    return false;
+                } catch (Throwable t) {
+                    ExceptionUtils.handleThrowable(t);
+                    // This means we got an OOM or similar creating a thread, or that
+                    // the pool and its queue are full
+                    log.error(sm.getString("endpoint.process.fail"), t);
+                    return false;
+                }
+                return true;
+            }
+
+            @Override
+            public void run() {
+                // Perform the IO operation
+                // Called from the poller to continue the IO operation
+                long nBytes = 0;
+                if (getError() == null) {
+                    try {
+                        synchronized (this) {
+                            if (!completionDone) {
+                                // This filters out same notification until processing
+                                // of the current one is done
+                                if (log.isDebugEnabled()) {
+                                    log.debug("Skip concurrent " + (read ? "read" : "write") + " notification");
+                                }
+                                return;
+                            }
+                            if (!read && flush(false)) {
+                                inline = false;
+                                registerWriteInterest();
+                                return;
+                            }
+
+                            // Find the buffer on which the operation will be performed (no vectoring with APR)
+                            ByteBuffer buffer = null;
+                            for (int i = 0; i < length; i++) {
+                                if (buffers[i + offset].hasRemaining()) {
+                                    buffer = buffers[i + offset];
+                                    break;
+                                }
+                            }
+                            if (buffer == null ) {
+                                // Nothing to do
+                                return;
+                            }
+                            if (read) {
+                                nBytes = read(false, buffer);
+                            } else {
+                                int remaining = buffer.remaining();
+                                writeNonBlockingDirect(buffer);
+                                nBytes = remaining - buffer.remaining();
+                            }
+                            if (nBytes != 0) {
+                                completionDone = false;
+                            }
+                        }
+                    } catch (IOException e) {
+                        setError(e);
+                    }
+                }
+                if (nBytes > 0) {
+                    try {
+                        if (!read && flush(false)) {
+                            registerWriteInterest();
+                        }
+                    } catch (IOException e) {
+                        // Ignore, will be delayed to later
+                    }
+                    // The bytes read are only updated in the completion handler
+                    completion.completed(Long.valueOf(nBytes), this);
+                } else if (nBytes < 0 || getError() != null) {
+                    IOException error = getError();
+                    if (error == null) {
+                        error = new EOFException();
+                    }
+                    completion.failed(error, this);
+                } else {
+                    // As soon as the operation uses the poller, it is no longer inline
+                    inline = false;
+                    if (read) {
+                        registerReadInterest();
+                    } else {
+                        registerWriteInterest();
+                    }
+                }
+            }
+
+        }
+
+        @Override
+        public <A> CompletionState read(ByteBuffer[] dsts, int offset, int length,
+                BlockingMode block, long timeout, TimeUnit unit, A attachment,
+                CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
+            IOException ioe = getError();
+            if (ioe != null) {
+                handler.failed(ioe, attachment);
+                return CompletionState.ERROR;
+            }
+            if (timeout == -1) {
+                timeout = toTimeout(getReadTimeout());
+            } else if (unit.toMillis(timeout) != getReadTimeout()) {
+                setReadTimeout(unit.toMillis(timeout));
+            }
+            if (block == BlockingMode.BLOCK || block == BlockingMode.SEMI_BLOCK) {
+                try {
+                    if (!readPending.tryAcquire(timeout, unit)) {
+                        handler.failed(new SocketTimeoutException(), attachment);
+                        return CompletionState.ERROR;
+                    }
+                } catch (InterruptedException e) {
+                    handler.failed(e, attachment);
+                    return CompletionState.ERROR;
+                }
+            } else {
+                if (!readPending.tryAcquire()) {
+                    if (block == BlockingMode.NON_BLOCK) {
+                        return CompletionState.NOT_DONE;
+                    } else {
+                        handler.failed(new ReadPendingException(), attachment);
+                        return CompletionState.ERROR;
+                    }
+                }
+            }
+            VectoredIOCompletionHandler<A> completion = new VectoredIOCompletionHandler<>();
+            OperationState<A> state = new OperationState<>(true, dsts, offset, length, block,
+                    attachment, check, handler, readPending, completion);
+            readOperation = state;
+            state.run();
+            if (block == BlockingMode.BLOCK) {
+                synchronized (state) {
+                    if (state.state == CompletionState.PENDING) {
+                        try {
+                            state.wait(unit.toMillis(timeout));
+                            if (state.state == CompletionState.PENDING) {
+                                return CompletionState.ERROR;
+                            }
+                        } catch (InterruptedException e) {
+                            handler.failed(new SocketTimeoutException(), attachment);
+                            return CompletionState.ERROR;
+                        }
+                    }
+                }
+            }
+            return state.state;
+        }
+
+        @Override
+        public <A> CompletionState write(ByteBuffer[] srcs, int offset, int length,
+                BlockingMode block, long timeout, TimeUnit unit, A attachment,
+                CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
+            IOException ioe = getError();
+            if (ioe != null) {
+                handler.failed(ioe, attachment);
+                return CompletionState.ERROR;
+            }
+            if (timeout == -1) {
+                timeout = toTimeout(getWriteTimeout());
+            } else if (unit.toMillis(timeout) != getWriteTimeout()) {
+                setWriteTimeout(unit.toMillis(timeout));
+            }
+            if (block == BlockingMode.BLOCK || block == BlockingMode.SEMI_BLOCK) {
+                try {
+                    if (!writePending.tryAcquire(timeout, unit)) {
+                        handler.failed(new SocketTimeoutException(), attachment);
+                        return CompletionState.ERROR;
+                    }
+                } catch (InterruptedException e) {
+                    handler.failed(e, attachment);
+                    return CompletionState.ERROR;
+                }
+            } else {
+                if (!writePending.tryAcquire()) {
+                    if (block == BlockingMode.NON_BLOCK) {
+                        return CompletionState.NOT_DONE;
+                    } else {
+                        handler.failed(new WritePendingException(), attachment);
+                        return CompletionState.ERROR;
+                    }
+                }
+            }
+            VectoredIOCompletionHandler<A> completion = new VectoredIOCompletionHandler<>();
+            OperationState<A> state = new OperationState<>(false, srcs, offset, length, block,
+                    attachment, check, handler, writePending, completion);
+            writeOperation = state;
+            state.run();
+            if (block == BlockingMode.BLOCK) {
+                synchronized (state) {
+                    if (state.state == CompletionState.PENDING) {
+                        try {
+                            state.wait(unit.toMillis(timeout));
+                            if (state.state == CompletionState.PENDING) {
+                                return CompletionState.ERROR;
+                            }
+                        } catch (InterruptedException e) {
+                            handler.failed(new SocketTimeoutException(), attachment);
+                            return CompletionState.ERROR;
+                        }
+                    }
+                }
+            }
+            return state.state;
+        }
+
+        private class VectoredIOCompletionHandler<A> implements CompletionHandler<Long, OperationState<A>> {
+            @Override
+            public void completed(Long nBytes, OperationState<A> state) {
+                if (nBytes.longValue() < 0) {
+                    failed(new EOFException(), state);
+                } else {
+                    state.nBytes += nBytes.longValue();
+                    CompletionState currentState = state.inline ? CompletionState.INLINE : CompletionState.DONE;
+                    boolean complete = true;
+                    boolean completion = true;
+                    if (state.check != null) {
+                        CompletionHandlerCall call = state.check.callHandler(currentState, state.buffers, state.offset, state.length);
+                        if (call == CompletionHandlerCall.CONTINUE) {
+                            complete = false;
+                        } else if (call == CompletionHandlerCall.NONE) {
+                            completion = false;
+                        }
+                    }
+                    if (complete) {
+                        boolean notify = false;
+                        state.semaphore.release();
+                        if (state.read) {
+                            readOperation = null;
+                        } else {
+                            writeOperation = null;
+                        }
+                        if (state.block == BlockingMode.BLOCK && currentState != CompletionState.INLINE) {
+                            notify = true;
+                        } else {
+                            state.state = currentState;
+                        }
+                        if (completion && state.handler != null) {
+                            state.handler.completed(Long.valueOf(state.nBytes), state.attachment);
+                        }
+                        synchronized (state) {
+                            state.completionDone = true;
+                            if (notify) {
+                                state.state = currentState;
+                                state.notify();
+                            }
+                        }
+                    } else {
+                        synchronized (state) {
+                            state.completionDone = true;
+                        }
+                        state.run();
+                    }
+                }
+            }
+            @Override
+            public void failed(Throwable exc, OperationState<A> state) {
+                IOException ioe = null;
+                if (exc instanceof InterruptedByTimeoutException) {
+                    ioe = new SocketTimeoutException();
+                    exc = ioe;
+                } else if (exc instanceof IOException) {
+                    ioe = (IOException) exc;
+                }
+                setError(ioe);
+                boolean notify = false;
+                state.semaphore.release();
+                if (state.read) {
+                    readOperation = null;
+                } else {
+                    writeOperation = null;
+                }
+                if (state.block == BlockingMode.BLOCK) {
+                    notify = true;
+                } else {
+                    state.state = state.inline ? CompletionState.ERROR : CompletionState.DONE;
+                }
+                if (state.handler != null) {
+                    state.handler.failed(exc, state.attachment);
+                }
+                synchronized (state) {
+                    state.completionDone = true;
+                    if (notify) {
+                        state.state = state.inline ? CompletionState.ERROR : CompletionState.DONE;
+                        state.notify();
+                    }
+                }
+            }
+        }
+
     }
 }
diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml
index 9043f9c..c8f8515 100644
--- a/webapps/docs/changelog.xml
+++ b/webapps/docs/changelog.xml
@@ -96,6 +96,10 @@
         Remove <code>pollerThreadCount</code> Connector attribute for NIO,
         one poller thread is sufficient. (remm)
       </update>
+      <add>
+        Add async IO for APR connector for consistency, but disable it by
+        default due to low performance. (remm)
+      </add>
     </changelog>
   </subsection>
   <subsection name="Other">
diff --git a/webapps/docs/config/http.xml b/webapps/docs/config/http.xml
index ce1b6cf..a2dccfe 100644
--- a/webapps/docs/config/http.xml
+++ b/webapps/docs/config/http.xml
@@ -639,6 +639,12 @@
       system property.</p>
     </attribute>
 
+    <attribute name="useAsyncIO" required="false">
+      <p>(bool)Use this attribute to enable or disable usage of the
+      asynchronous IO API. The default value is <code>true</code> except when
+      using the APR connector due to low performance.</p>
+    </attribute>
+
   </attributes>
 
   </subsection>
@@ -740,11 +746,6 @@
         default value is <code>1000</code> milliseconds.</p>
       </attribute>
 
-      <attribute name="useAsyncIO" required="false">
-        <p>(bool)Use this attribute to enable or disable usage of the
-        asynchronous IO API. The default value is <code>true</code>.</p>
-      </attribute>
-
       <attribute name="useSendfile" required="false">
         <p>(bool)Use this attribute to enable or disable sendfile capability.
         The default value is <code>true</code>. Note that the use of sendfile
@@ -894,11 +895,6 @@
 
     <attributes>
 
-      <attribute name="useAsyncIO" required="false">
-        <p>(bool)Use this attribute to enable or disable usage of the
-        asynchronous IO API. The default value is <code>true</code>.</p>
-      </attribute>
-
       <attribute name="useSendfile" required="false">
         <p>(bool)Use this attribute to enable or disable sendfile capability.
         The default value is <code>true</code>. Note that the use of sendfile


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org