You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2019/11/19 14:26:55 UTC
[tomcat] 01/02: Refactor APR Poller to remove use of multiple
pollsets
This is an automated email from the ASF dual-hosted git repository.
markt pushed a commit to branch 8.5.x
in repository https://gitbox.apache.org/repos/asf/tomcat.git
commit 4508e70e4fcf542c6071ef77c13cd7141abb9bf4
Author: Mark Thomas <ma...@apache.org>
AuthorDate: Tue Nov 19 13:18:36 2019 +0000
Refactor APR Poller to remove use of multiple pollsets
---
java/org/apache/tomcat/util/net/AprEndpoint.java | 409 +++++++++--------------
webapps/docs/changelog.xml | 9 +
2 files changed, 165 insertions(+), 253 deletions(-)
diff --git a/java/org/apache/tomcat/util/net/AprEndpoint.java b/java/org/apache/tomcat/util/net/AprEndpoint.java
index 5af9fe0..4821279 100644
--- a/java/org/apache/tomcat/util/net/AprEndpoint.java
+++ b/java/org/apache/tomcat/util/net/AprEndpoint.java
@@ -1061,36 +1061,14 @@ public class AprEndpoint extends AbstractEndpoint<Long> implements SNICallBack {
public class Poller implements Runnable {
/**
- * Pointers to the pollers.
+ * Pointer to the poller.
*/
- private long[] pollers = null;
+ private long aprPoller;
/**
* Actual poller size.
*/
- private int actualPollerSize = 0;
-
- /**
- * Amount of spots left in the poller.
- */
- private int[] pollerSpace = null;
-
- /**
- * Amount of low level pollers in use by this poller.
- */
- private int pollerCount;
-
- /**
- * Timeout value for the poll call.
- */
- private int pollerTime;
-
- /**
- * Variable poller timeout that adjusts depending on how many poll sets
- * are in use so that the total poll time across all poll sets remains
- * equal to pollTime.
- */
- private int nextPollerTime;
+ private int pollerSize = 0;
/**
* Root pool.
@@ -1144,55 +1122,18 @@ public class AprEndpoint extends AbstractEndpoint<Long> implements SNICallBack {
private volatile boolean pollerRunning = true;
/**
- * Create the poller. With some versions of APR, the maximum poller size
- * will be 62 (recompiling APR is necessary to remove this limitation).
+ * Create the poller.
*/
protected synchronized void init() {
pool = Pool.create(serverSockPool);
-
- // Single poller by default
- int defaultPollerSize = getMaxConnections();
-
- if ((OS.IS_WIN32 || OS.IS_WIN64) && (defaultPollerSize > 1024)) {
- // The maximum per poller to get reasonable performance is 1024
- // Adjust poller size so that it won't reach the limit. This is
- // a limitation of XP / Server 2003 that has been fixed in
- // Vista / Server 2008 onwards.
- actualPollerSize = 1024;
- } else {
- actualPollerSize = defaultPollerSize;
- }
-
- timeouts = new SocketTimeouts(defaultPollerSize);
+ pollerSize = getMaxConnections();
+ timeouts = new SocketTimeouts(pollerSize);
// At the moment, setting the timeout is useless, but it could get
// used again as the normal poller could be faster using maintain.
// It might not be worth bothering though.
- long pollset = allocatePoller(actualPollerSize, pool, -1);
- if (pollset == 0 && actualPollerSize > 1024) {
- actualPollerSize = 1024;
- pollset = allocatePoller(actualPollerSize, pool, -1);
- }
- if (pollset == 0) {
- actualPollerSize = 62;
- pollset = allocatePoller(actualPollerSize, pool, -1);
- }
-
- pollerCount = defaultPollerSize / actualPollerSize;
- pollerTime = pollTime / pollerCount;
- nextPollerTime = pollerTime;
-
- pollers = new long[pollerCount];
- pollers[0] = pollset;
- for (int i = 1; i < pollerCount; i++) {
- pollers[i] = allocatePoller(actualPollerSize, pool, -1);
- }
-
- pollerSpace = new int[pollerCount];
- for (int i = 0; i < pollerCount; i++) {
- pollerSpace[i] = actualPollerSize;
- }
+ aprPoller = allocatePoller(pollerSize, pool, -1);
/*
* x2 - One descriptor for the socket, one for the event(s).
@@ -1201,12 +1142,12 @@ public class AprEndpoint extends AbstractEndpoint<Long> implements SNICallBack {
* for a maximum of two events (read and write) at any one
* time.
*
- * Therefore size is actual poller size *4.
+ * Therefore size is poller size *4.
*/
- desc = new long[actualPollerSize * 4];
+ desc = new long[pollerSize * 4];
connectionCount.set(0);
- addList = new SocketList(defaultPollerSize);
- closeList = new SocketList(defaultPollerSize);
+ addList = new SocketList(pollerSize);
+ closeList = new SocketList(pollerSize);
}
@@ -1228,7 +1169,7 @@ public class AprEndpoint extends AbstractEndpoint<Long> implements SNICallBack {
// still in the poller can cause problems
try {
this.notify();
- this.wait(pollerCount * pollTime / 1000);
+ this.wait(pollTime / 1000);
} catch (InterruptedException e) {
// Ignore
}
@@ -1257,12 +1198,10 @@ public class AprEndpoint extends AbstractEndpoint<Long> implements SNICallBack {
}
addList.clear();
// Close all sockets still in the poller
- for (int i = 0; i < pollerCount; i++) {
- int rv = Poll.pollset(pollers[i], desc);
- if (rv > 0) {
- for (int n = 0; n < rv; n++) {
- destroySocket(desc[n*2+1]);
- }
+ int rv = Poll.pollset(aprPoller, desc);
+ if (rv > 0) {
+ for (int n = 0; n < rv; n++) {
+ destroySocket(desc[n*2+1]);
}
}
Pool.destroy(pool);
@@ -1303,6 +1242,7 @@ public class AprEndpoint extends AbstractEndpoint<Long> implements SNICallBack {
// Add socket to the list. Newly added sockets will wait
// at most for pollTime before being polled.
if (addList.add(socket, timeout, flags)) {
+ // In case the poller thread is in the idle wait
this.notify();
}
}
@@ -1314,16 +1254,10 @@ public class AprEndpoint extends AbstractEndpoint<Long> implements SNICallBack {
* {@link Poller#run()}.
*/
private boolean addToPoller(long socket, int events) {
- int rv = -1;
- for (int i = 0; i < pollers.length; i++) {
- if (pollerSpace[i] > 0) {
- rv = Poll.add(pollers[i], socket, events);
- if (rv == Status.APR_SUCCESS) {
- pollerSpace[i]--;
- connectionCount.incrementAndGet();
- return true;
- }
- }
+ int rv = Poll.add(aprPoller, socket, events);
+ if (rv == Status.APR_SUCCESS) {
+ connectionCount.incrementAndGet();
+ return true;
}
return false;
}
@@ -1336,6 +1270,7 @@ public class AprEndpoint extends AbstractEndpoint<Long> implements SNICallBack {
*/
private synchronized void close(long socket) {
closeList.add(socket, 0, 0);
+ // In case the poller thread is in the idle wait
this.notify();
}
@@ -1349,24 +1284,18 @@ public class AprEndpoint extends AbstractEndpoint<Long> implements SNICallBack {
log.debug(sm.getString("endpoint.debug.pollerRemove",
Long.valueOf(socket)));
}
- int rv = -1;
- for (int i = 0; i < pollers.length; i++) {
- if (pollerSpace[i] < actualPollerSize) {
- rv = Poll.remove(pollers[i], socket);
- if (rv != Status.APR_NOTFOUND) {
- pollerSpace[i]++;
- connectionCount.decrementAndGet();
- if (log.isDebugEnabled()) {
- log.debug(sm.getString("endpoint.debug.pollerRemoved",
- Long.valueOf(socket)));
- }
- break;
- }
+ int rv = Poll.remove(aprPoller, socket);
+ if (rv != Status.APR_NOTFOUND) {
+ connectionCount.decrementAndGet();
+ if (log.isDebugEnabled()) {
+ log.debug(sm.getString("endpoint.debug.pollerRemoved",
+ Long.valueOf(socket)));
}
}
timeouts.remove(socket);
}
+
/**
* Timeout checks. Must only be called from {@link Poller#run()}.
*/
@@ -1400,15 +1329,13 @@ public class AprEndpoint extends AbstractEndpoint<Long> implements SNICallBack {
public String toString() {
StringBuffer buf = new StringBuffer();
buf.append("Poller");
- long[] res = new long[actualPollerSize * 2];
- for (int i = 0; i < pollers.length; i++) {
- int count = Poll.pollset(pollers[i], res);
- buf.append(" [ ");
- for (int j = 0; j < count; j++) {
- buf.append(desc[2*j+1]).append(" ");
- }
- buf.append("]");
+ long[] res = new long[pollerSize * 2];
+ int count = Poll.pollset(aprPoller, res);
+ buf.append(" [ ");
+ for (int j = 0; j < count; j++) {
+ buf.append(desc[2*j+1]).append(" ");
}
+ buf.append("]");
return buf.toString();
}
@@ -1533,170 +1460,146 @@ public class AprEndpoint extends AbstractEndpoint<Long> implements SNICallBack {
}
}
- // Poll for the specified interval
- for (int i = 0; i < pollers.length; i++) {
+ // Flag to ask to reallocate the pool
+ boolean reset = false;
- // Flag to ask to reallocate the pool
- boolean reset = false;
-
- int rv = 0;
- // Reset the nextPollerTime
- nextPollerTime = pollerTime;
- // Iterate on each pollers, but no need to poll empty pollers
- if (pollerSpace[i] < actualPollerSize) {
- rv = Poll.poll(pollers[i], nextPollerTime, desc, true);
- // Reset the nextPollerTime
- nextPollerTime = pollerTime;
- } else {
- // Skipping an empty poll set means skipping a wait
- // time of pollerTime microseconds. If most of the
- // poll sets are skipped then this loop will be
- // tighter than expected which could lead to higher
- // than expected CPU usage. Extending the
- // nextPollerTime ensures that this loop always
- // takes about the same time to execute.
- nextPollerTime += pollerTime;
- }
- if (rv > 0) {
- rv = mergeDescriptors(desc, rv);
- pollerSpace[i] += rv;
- connectionCount.addAndGet(-rv);
- for (int n = 0; n < rv; n++) {
- if (getLog().isDebugEnabled()) {
- log.debug(sm.getString(
- "endpoint.debug.pollerProcess",
- Long.valueOf(desc[n*2+1]),
- Long.valueOf(desc[n*2])));
- }
- long timeout = timeouts.remove(desc[n*2+1]);
- AprSocketWrapper wrapper = connections.get(
- Long.valueOf(desc[n*2+1]));
- if (wrapper == null) {
- // Socket was closed in another thread while still in
- // the Poller but wasn't removed from the Poller before
- // new data arrived.
- continue;
- }
- wrapper.pollerFlags = wrapper.pollerFlags & ~((int) desc[n*2]);
- // Check for failed sockets and hand this socket off to a worker
- if (((desc[n*2] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP)
- || ((desc[n*2] & Poll.APR_POLLERR) == Poll.APR_POLLERR)
- || ((desc[n*2] & Poll.APR_POLLNVAL) == Poll.APR_POLLNVAL)) {
- // Need to trigger error handling. Poller may return error
- // codes plus the flags it was waiting for or it may just
- // return an error code. We could handle the error here but
- // if we do, there will be no exception associated with the
- // error in application code. By signalling read/write is
- // possible, a read/write will be attempted, fail and that
- // will trigger an exception the application will see.
- // Check the return flags first, followed by what the socket
- // was registered for
- if ((desc[n*2] & Poll.APR_POLLIN) == Poll.APR_POLLIN) {
- // Error probably occurred during a non-blocking read
- if (!processSocket(desc[n*2+1], SocketEvent.OPEN_READ)) {
- // Close socket and clear pool
- closeSocket(desc[n*2+1]);
- }
- } else if ((desc[n*2] & Poll.APR_POLLOUT) == Poll.APR_POLLOUT) {
- // Error probably occurred during a non-blocking write
- if (!processSocket(desc[n*2+1], SocketEvent.OPEN_WRITE)) {
- // Close socket and clear pool
- closeSocket(desc[n*2+1]);
- }
- } else if ((wrapper.pollerFlags & Poll.APR_POLLIN) == Poll.APR_POLLIN) {
- // Can't tell what was happening when the error occurred but the
- // socket is registered for non-blocking read so use that
- if (!processSocket(desc[n*2+1], SocketEvent.OPEN_READ)) {
- // Close socket and clear pool
- closeSocket(desc[n*2+1]);
- }
- } else if ((wrapper.pollerFlags & Poll.APR_POLLOUT) == Poll.APR_POLLOUT) {
- // Can't tell what was happening when the error occurred but the
- // socket is registered for non-blocking write so use that
- if (!processSocket(desc[n*2+1], SocketEvent.OPEN_WRITE)) {
- // Close socket and clear pool
- closeSocket(desc[n*2+1]);
- }
- } else {
+ int rv = Poll.poll(aprPoller, pollTime, desc, true);
+ if (rv > 0) {
+ rv = mergeDescriptors(desc, rv);
+ connectionCount.addAndGet(-rv);
+ for (int n = 0; n < rv; n++) {
+ if (getLog().isDebugEnabled()) {
+ log.debug(sm.getString(
+ "endpoint.debug.pollerProcess",
+ Long.valueOf(desc[n*2+1]),
+ Long.valueOf(desc[n*2])));
+ }
+ long timeout = timeouts.remove(desc[n*2+1]);
+ AprSocketWrapper wrapper = connections.get(
+ Long.valueOf(desc[n*2+1]));
+ if (wrapper == null) {
+ // Socket was closed in another thread while still in
+ // the Poller but wasn't removed from the Poller before
+ // new data arrived.
+ continue;
+ }
+ wrapper.pollerFlags = wrapper.pollerFlags & ~((int) desc[n*2]);
+ // Check for failed sockets and hand this socket off to a worker
+ if (((desc[n*2] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP)
+ || ((desc[n*2] & Poll.APR_POLLERR) == Poll.APR_POLLERR)
+ || ((desc[n*2] & Poll.APR_POLLNVAL) == Poll.APR_POLLNVAL)) {
+ // Need to trigger error handling. Poller may return error
+ // codes plus the flags it was waiting for or it may just
+ // return an error code. We could handle the error here but
+ // if we do, there will be no exception associated with the
+ // error in application code. By signalling read/write is
+ // possible, a read/write will be attempted, fail and that
+ // will trigger an exception the application will see.
+ // Check the return flags first, followed by what the socket
+ // was registered for
+ if ((desc[n*2] & Poll.APR_POLLIN) == Poll.APR_POLLIN) {
+ // Error probably occurred during a non-blocking read
+ if (!processSocket(desc[n*2+1], SocketEvent.OPEN_READ)) {
// Close socket and clear pool
closeSocket(desc[n*2+1]);
}
- } else if (((desc[n*2] & Poll.APR_POLLIN) == Poll.APR_POLLIN)
- || ((desc[n*2] & Poll.APR_POLLOUT) == Poll.APR_POLLOUT)) {
- boolean error = false;
- if (((desc[n*2] & Poll.APR_POLLIN) == Poll.APR_POLLIN) &&
- !processSocket(desc[n*2+1], SocketEvent.OPEN_READ)) {
- error = true;
+ } else if ((desc[n*2] & Poll.APR_POLLOUT) == Poll.APR_POLLOUT) {
+ // Error probably occurred during a non-blocking write
+ if (!processSocket(desc[n*2+1], SocketEvent.OPEN_WRITE)) {
// Close socket and clear pool
closeSocket(desc[n*2+1]);
}
- if (!error &&
- ((desc[n*2] & Poll.APR_POLLOUT) == Poll.APR_POLLOUT) &&
- !processSocket(desc[n*2+1], SocketEvent.OPEN_WRITE)) {
+ } else if ((wrapper.pollerFlags & Poll.APR_POLLIN) == Poll.APR_POLLIN) {
+ // Can't tell what was happening when the error occurred but the
+ // socket is registered for non-blocking read so use that
+ if (!processSocket(desc[n*2+1], SocketEvent.OPEN_READ)) {
// Close socket and clear pool
- error = true;
closeSocket(desc[n*2+1]);
}
- if (!error && wrapper.pollerFlags != 0) {
- // If socket was registered for multiple events but
- // only some of the occurred, re-register for the
- // remaining events.
- // timeout is the value of System.currentTimeMillis() that
- // was set as the point that the socket will timeout. When
- // adding to the poller, the timeout from now in
- // milliseconds is required.
- // So first, subtract the current timestamp
- if (timeout > 0) {
- timeout = timeout - System.currentTimeMillis();
- }
- // If the socket should have already expired by now,
- // re-add it with a very short timeout
- if (timeout <= 0) {
- timeout = 1;
- }
- // Should be impossible but just in case since timeout will
- // be cast to an int.
- if (timeout > Integer.MAX_VALUE) {
- timeout = Integer.MAX_VALUE;
- }
- add(desc[n*2+1], (int) timeout, wrapper.pollerFlags);
+ } else if ((wrapper.pollerFlags & Poll.APR_POLLOUT) == Poll.APR_POLLOUT) {
+ // Can't tell what was happening when the error occurred but the
+ // socket is registered for non-blocking write so use that
+ if (!processSocket(desc[n*2+1], SocketEvent.OPEN_WRITE)) {
+ // Close socket and clear pool
+ closeSocket(desc[n*2+1]);
}
} else {
- // Unknown event
- getLog().warn(sm.getString(
- "endpoint.apr.pollUnknownEvent",
- Long.valueOf(desc[n*2])));
// Close socket and clear pool
closeSocket(desc[n*2+1]);
}
- }
- } else if (rv < 0) {
- int errn = -rv;
- // Any non timeup or interrupted error is critical
- if ((errn != Status.TIMEUP) && (errn != Status.EINTR)) {
- if (errn > Status.APR_OS_START_USERERR) {
- errn -= Status.APR_OS_START_USERERR;
+ } else if (((desc[n*2] & Poll.APR_POLLIN) == Poll.APR_POLLIN)
+ || ((desc[n*2] & Poll.APR_POLLOUT) == Poll.APR_POLLOUT)) {
+ boolean error = false;
+ if (((desc[n*2] & Poll.APR_POLLIN) == Poll.APR_POLLIN) &&
+ !processSocket(desc[n*2+1], SocketEvent.OPEN_READ)) {
+ error = true;
+ // Close socket and clear pool
+ closeSocket(desc[n*2+1]);
}
- getLog().error(sm.getString(
- "endpoint.apr.pollError",
- Integer.valueOf(errn),
- Error.strerror(errn)));
- // Destroy and reallocate the poller
- reset = true;
+ if (!error &&
+ ((desc[n*2] & Poll.APR_POLLOUT) == Poll.APR_POLLOUT) &&
+ !processSocket(desc[n*2+1], SocketEvent.OPEN_WRITE)) {
+ // Close socket and clear pool
+ error = true;
+ closeSocket(desc[n*2+1]);
+ }
+ if (!error && wrapper.pollerFlags != 0) {
+ // If socket was registered for multiple events but
+ // only some of the occurred, re-register for the
+ // remaining events.
+ // timeout is the value of System.currentTimeMillis() that
+ // was set as the point that the socket will timeout. When
+ // adding to the poller, the timeout from now in
+ // milliseconds is required.
+ // So first, subtract the current timestamp
+ if (timeout > 0) {
+ timeout = timeout - System.currentTimeMillis();
+ }
+ // If the socket should have already expired by now,
+ // re-add it with a very short timeout
+ if (timeout <= 0) {
+ timeout = 1;
+ }
+ // Should be impossible but just in case since timeout will
+ // be cast to an int.
+ if (timeout > Integer.MAX_VALUE) {
+ timeout = Integer.MAX_VALUE;
+ }
+ add(desc[n*2+1], (int) timeout, wrapper.pollerFlags);
+ }
+ } else {
+ // Unknown event
+ getLog().warn(sm.getString(
+ "endpoint.apr.pollUnknownEvent",
+ Long.valueOf(desc[n*2])));
+ // Close socket and clear pool
+ closeSocket(desc[n*2+1]);
}
}
-
- if (reset && pollerRunning) {
- // Reallocate the current poller
- int count = Poll.pollset(pollers[i], desc);
- long newPoller = allocatePoller(actualPollerSize, pool, -1);
- // Don't restore connections for now, since I have not tested it
- pollerSpace[i] = actualPollerSize;
- connectionCount.addAndGet(-count);
- Poll.destroy(pollers[i]);
- pollers[i] = newPoller;
+ } else if (rv < 0) {
+ int errn = -rv;
+ // Any non timeup or interrupted error is critical
+ if ((errn != Status.TIMEUP) && (errn != Status.EINTR)) {
+ if (errn > Status.APR_OS_START_USERERR) {
+ errn -= Status.APR_OS_START_USERERR;
+ }
+ getLog().error(sm.getString(
+ "endpoint.apr.pollError",
+ Integer.valueOf(errn),
+ Error.strerror(errn)));
+ // Destroy and reallocate the poller
+ reset = true;
}
+ }
+ if (reset && pollerRunning) {
+ // Reallocate the current poller
+ int count = Poll.pollset(aprPoller, desc);
+ long newPoller = allocatePoller(pollerSize, pool, -1);
+ // Don't restore connections for now, since I have not tested it
+ connectionCount.addAndGet(-count);
+ Poll.destroy(aprPoller);
+ aprPoller = newPoller;
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
@@ -2146,7 +2049,7 @@ public class AprEndpoint extends AbstractEndpoint<Long> implements SNICallBack {
* This class is the equivalent of the Worker, but will simply use in an
* external Executor thread pool.
*/
- protected class SocketProcessor extends SocketProcessorBase<Long> {
+ protected class SocketProcessor extends SocketProcessorBase<Long> {
public SocketProcessor(SocketWrapperBase<Long> socketWrapper, SocketEvent event) {
super(socketWrapper, event);
diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml
index c1e9292..bf86274 100644
--- a/webapps/docs/changelog.xml
+++ b/webapps/docs/changelog.xml
@@ -45,6 +45,15 @@
issues do not "pop up" wrt. others).
-->
<section name="Tomcat 8.5.50 (markt)" rtext="in development">
+ <subsection name="Coyote">
+ <changelog>
+ <scode>
+ Refactor the APR poller to always use a single pollset now that the
+ Windows operating systems that required multiple smaller pollsets to be
+ used are no longer supported. (markt)
+ </scode>
+ </changelog>
+ </subsection>
</section>
<section name="Tomcat 8.5.49 (markt)" rtext="release in progress">
<subsection name="Catalina">
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org