You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2013/08/22 23:45:55 UTC
svn commit: r1516628 [1/2] - in /tomcat/tc7.0.x/trunk: ./
java/org/apache/coyote/http11/ java/org/apache/tomcat/util/net/
java/org/apache/tomcat/util/net/res/ webapps/docs/config/
Author: markt
Date: Thu Aug 22 21:45:54 2013
New Revision: 1516628
URL: http://svn.apache.org/r1516628
Log:
Back-port APR/native changes required to support JSR-356
Significant changes are:
- Switch to a single Poller with multiple Pollsets
- Enable HTTP upgrade connections to use concurrent read/write
- Lots of refactoring and clean-up
Modified:
tomcat/tc7.0.x/trunk/ (props changed)
tomcat/tc7.0.x/trunk/java/org/apache/coyote/http11/Http11AprProtocol.java
tomcat/tc7.0.x/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
tomcat/tc7.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
tomcat/tc7.0.x/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java
tomcat/tc7.0.x/trunk/java/org/apache/tomcat/util/net/res/LocalStrings.properties
tomcat/tc7.0.x/trunk/webapps/docs/config/http.xml
Propchange: tomcat/tc7.0.x/trunk/
------------------------------------------------------------------------------
Merged /tomcat/trunk:r1433976,1434403,1434428,1434438,1434447,1434456,1434463,1434500,1434598,1434725,1441807,1441916,1441920,1459523-1459524,1460107,1476972,1481164,1485489,1485495,1485611,1485847,1488793,1513665
Modified: tomcat/tc7.0.x/trunk/java/org/apache/coyote/http11/Http11AprProtocol.java
URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/coyote/http11/Http11AprProtocol.java?rev=1516628&r1=1516627&r2=1516628&view=diff
==============================================================================
--- tomcat/tc7.0.x/trunk/java/org/apache/coyote/http11/Http11AprProtocol.java (original)
+++ tomcat/tc7.0.x/trunk/java/org/apache/coyote/http11/Http11AprProtocol.java Thu Aug 22 21:45:54 2013
@@ -82,9 +82,6 @@ public class Http11AprProtocol extends A
public void setPollerSize(int pollerSize) { endpoint.setMaxConnections(pollerSize); }
public int getPollerSize() { return endpoint.getMaxConnections(); }
- public void setPollerThreadCount(int pollerThreadCount) { ((AprEndpoint)endpoint).setPollerThreadCount(pollerThreadCount); }
- public int getPollerThreadCount() { return ((AprEndpoint)endpoint).getPollerThreadCount(); }
-
public int getSendfileSize() { return ((AprEndpoint)endpoint).getSendfileSize(); }
public void setSendfileSize(int sendfileSize) { ((AprEndpoint)endpoint).setSendfileSize(sendfileSize); }
@@ -249,8 +246,7 @@ public class Http11AprProtocol extends A
if (addToPoller && proto.endpoint.isRunning()) {
((AprEndpoint)proto.endpoint).getPoller().add(
socket.getSocket().longValue(),
- proto.endpoint.getKeepAliveTimeout(),
- true, false);
+ proto.endpoint.getKeepAliveTimeout(), true, false);
}
}
@@ -271,19 +267,23 @@ public class Http11AprProtocol extends A
} else if (processor.isComet()) {
// Comet
if (proto.endpoint.isRunning()) {
- ((AprEndpoint) proto.endpoint).getCometPoller().add(
+ socket.setComet(true);
+ ((AprEndpoint) proto.endpoint).getPoller().add(
socket.getSocket().longValue(),
- proto.endpoint.getSoTimeout(),
- true, false);
+ proto.endpoint.getSoTimeout(), true, false);
} else {
// Process a STOP directly
((AprEndpoint) proto.endpoint).processSocket(
socket.getSocket().longValue(),
SocketStatus.STOP);
}
- } else {
+ } else if (processor.isUpgrade()) {
// Upgraded
((AprEndpoint) proto.endpoint).getPoller().add(
+ socket.getSocket().longValue(), -1, true, false);
+ } else {
+ // Tomcat 7 proprietary upgrade
+ ((AprEndpoint) proto.endpoint).getPoller().add(
socket.getSocket().longValue(),
processor.getUpgradeInbound().getReadTimeout(),
true, false);
Modified: tomcat/tc7.0.x/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java?rev=1516628&r1=1516627&r2=1516628&view=diff
==============================================================================
--- tomcat/tc7.0.x/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java (original)
+++ tomcat/tc7.0.x/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java Thu Aug 22 21:45:54 2013
@@ -22,6 +22,8 @@ import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
@@ -97,6 +99,9 @@ public class AprEndpoint extends Abstrac
protected ConcurrentLinkedQueue<SocketWrapper<Long>> waitingRequests =
new ConcurrentLinkedQueue<SocketWrapper<Long>>();
+ private final Map<Long,AprSocketWrapper> connections =
+ new ConcurrentHashMap<Long, AprSocketWrapper>();
+
// ------------------------------------------------------------ Constructor
public AprEndpoint() {
@@ -173,43 +178,29 @@ public class AprEndpoint extends Abstrac
/**
- * Poller thread count.
- */
- protected int pollerThreadCount = 0;
- public void setPollerThreadCount(int pollerThreadCount) { this.pollerThreadCount = pollerThreadCount; }
- public int getPollerThreadCount() { return pollerThreadCount; }
-
-
- /**
* The socket poller.
*/
- protected Poller[] pollers = null;
- protected int pollerRoundRobin = 0;
+ protected Poller poller = null;
public Poller getPoller() {
- pollerRoundRobin = (pollerRoundRobin + 1) % pollers.length;
- return pollers[pollerRoundRobin];
+ return poller;
}
/**
- * The socket poller used for Comet support.
+ * The socket poller.
*/
- protected Poller[] cometPollers = null;
- protected int cometPollerRoundRobin = 0;
- public Poller getCometPoller() {
- cometPollerRoundRobin = (cometPollerRoundRobin + 1) % cometPollers.length;
- return cometPollers[cometPollerRoundRobin];
+ protected AsyncTimeout asyncTimeout = null;
+ public AsyncTimeout getAsyncTimeout() {
+ return asyncTimeout;
}
/**
* The static file sender.
*/
- protected Sendfile[] sendfiles = null;
- protected int sendfileRoundRobin = 0;
+ protected Sendfile sendfile = null;
public Sendfile getSendfile() {
- sendfileRoundRobin = (sendfileRoundRobin + 1) % sendfiles.length;
- return sendfiles[sendfileRoundRobin];
+ return sendfile;
}
@@ -368,15 +359,11 @@ public class AprEndpoint extends Abstrac
* Number of keepalive sockets.
*/
public int getKeepAliveCount() {
- if (pollers == null) {
+ if (poller == null) {
return 0;
}
- int keepAliveCount = 0;
- for (int i = 0; i < pollers.length; i++) {
- keepAliveCount += pollers[i].getKeepAliveCount();
- }
- return keepAliveCount;
+ return poller.getConnectionCount();
}
@@ -384,15 +371,11 @@ public class AprEndpoint extends Abstrac
* Number of sendfile sockets.
*/
public int getSendfileCount() {
- if (sendfiles == null) {
+ if (sendfile == null) {
return 0;
}
- int sendfileCount = 0;
- for (int i = 0; i < sendfiles.length; i++) {
- sendfileCount += sendfiles[i].getSendfileCount();
- }
- return sendfileCount;
+ return sendfile.getSendfileCount();
}
@@ -460,35 +443,11 @@ public class AprEndpoint extends Abstrac
useSendfile = false;
}
- // Initialize thread count defaults for acceptor, poller and sendfile
+ // Initialize thread count default for acceptor
if (acceptorThreadCount == 0) {
// FIXME: Doesn't seem to work that well with multiple accept threads
acceptorThreadCount = 1;
}
- if (pollerThreadCount == 0) {
- if ((OS.IS_WIN32 || OS.IS_WIN64) && (getMaxConnections() > 1024)) {
- // The maximum per poller to get reasonable performance is 1024
- pollerThreadCount = getMaxConnections() / 1024;
- // Adjust poller size so that it won't reach the limit
- setMaxConnections(
- getMaxConnections() - (getMaxConnections() % 1024));
- } else {
- // No explicit poller size limitation
- pollerThreadCount = 1;
- }
- }
- if (sendfileThreadCount == 0) {
- if ((OS.IS_WIN32 || OS.IS_WIN64) && (sendfileSize > 1024)) {
- // The maximum per poller to get reasonable performance is 1024
- sendfileThreadCount = sendfileSize / 1024;
- // Adjust poller size so that it won't reach the limit
- sendfileSize = sendfileSize - (sendfileSize % 1024);
- } else {
- // No explicit poller size limitation
- // FIXME: Default to one per CPU ?
- sendfileThreadCount = 1;
- }
- }
// Delay accepting of new connections until data is available
// Only Linux kernels 2.4 + have that implemented
@@ -627,45 +586,30 @@ public class AprEndpoint extends Abstrac
initializeConnectionLatch();
- // Start poller threads
- pollers = new Poller[pollerThreadCount];
- for (int i = 0; i < pollerThreadCount; i++) {
- pollers[i] = new Poller(false);
- pollers[i].init();
- pollers[i].setName(getName() + "-Poller-" + i);
- pollers[i].setPriority(threadPriority);
- pollers[i].setDaemon(true);
- pollers[i].start();
- }
-
- // Start comet poller threads
- cometPollers = new Poller[pollerThreadCount];
- for (int i = 0; i < pollerThreadCount; i++) {
- cometPollers[i] = new Poller(true);
- cometPollers[i].init();
- cometPollers[i].setName(getName() + "-CometPoller-" + i);
- cometPollers[i].setPriority(threadPriority);
- cometPollers[i].setDaemon(true);
- cometPollers[i].start();
- }
+ // Start poller thread
+ poller = new Poller();
+ poller.init();
+ Thread pollerThread = new Thread(poller, getName() + "-Poller");
+ pollerThread.setPriority(threadPriority);
+ pollerThread.setDaemon(true);
+ pollerThread.start();
- // Start sendfile threads
+ // Start sendfile thread
if (useSendfile) {
- sendfiles = new Sendfile[sendfileThreadCount];
- for (int i = 0; i < sendfileThreadCount; i++) {
- sendfiles[i] = new Sendfile();
- sendfiles[i].init();
- sendfiles[i].setName(getName() + "-Sendfile-" + i);
- sendfiles[i].setPriority(threadPriority);
- sendfiles[i].setDaemon(true);
- sendfiles[i].start();
- }
+ sendfile = new Sendfile();
+ sendfile.init();
+ Thread sendfileThread =
+ new Thread(sendfile, getName() + "-Sendfile");
+ sendfileThread.setPriority(threadPriority);
+ sendfileThread.setDaemon(true);
+ sendfileThread.start();
}
startAcceptorThreads();
// Start async timeout thread
- Thread timeoutThread = new Thread(new AsyncTimeout(),
+ asyncTimeout = new AsyncTimeout();
+ Thread timeoutThread = new Thread(asyncTimeout,
getName() + "-AsyncTimeout");
timeoutThread.setPriority(threadPriority);
timeoutThread.setDaemon(true);
@@ -685,6 +629,8 @@ public class AprEndpoint extends Abstrac
}
if (running) {
running = false;
+ poller.stop();
+ asyncTimeout.stop();
unlockAccept();
for (AbstractEndpoint.Acceptor acceptor : acceptors) {
long waitLeft = 10000;
@@ -694,8 +640,7 @@ public class AprEndpoint extends Abstrac
try {
Thread.sleep(50);
} catch (InterruptedException e) {
- // Ignore and clean the interrupt flag
- Thread.interrupted();
+ // Ignore
}
waitLeft -= 50;
}
@@ -710,31 +655,20 @@ public class AprEndpoint extends Abstrac
}
}
}
- for (int i = 0; i < pollers.length; i++) {
- try {
- pollers[i].destroy();
- } catch (Exception e) {
- // Ignore
- }
+ try {
+ poller.destroy();
+ } catch (Exception e) {
+ // Ignore
}
- pollers = null;
- for (int i = 0; i < cometPollers.length; i++) {
+ poller = null;
+ connections.clear();
+ if (useSendfile) {
try {
- cometPollers[i].destroy();
+ sendfile.destroy();
} catch (Exception e) {
// Ignore
}
- }
- cometPollers = null;
- if (useSendfile) {
- for (int i = 0; i < sendfiles.length; i++) {
- try {
- sendfiles[i].destroy();
- } catch (Exception e) {
- // Ignore
- }
- }
- sendfiles = null;
+ sendfile = null;
}
}
shutdownExecutor();
@@ -842,16 +776,22 @@ public class AprEndpoint extends Abstrac
}
}
-
/**
- * Process given socket.
+ * Process given socket. This is called when the socket has been
+ * accepted.
*/
protected boolean processSocketWithOptions(long socket) {
try {
// During shutdown, executor may be null - avoid NPE
if (running) {
- SocketWrapper<Long> wrapper =
- new SocketWrapper<Long>(Long.valueOf(socket));
+ if (log.isDebugEnabled()) {
+ log.debug(sm.getString("endpoint.debug.socket",
+ Long.valueOf(socket)));
+ }
+ AprSocketWrapper wrapper =
+ new AprSocketWrapper(Long.valueOf(socket));
+ wrapper.setKeepAliveLeft(getMaxKeepAliveRequests());
+ connections.put(Long.valueOf(socket), wrapper);
getExecutor().execute(new SocketWithOptionsProcessor(wrapper));
}
} catch (RejectedExecutionException x) {
@@ -869,9 +809,10 @@ public class AprEndpoint extends Abstrac
/**
- * Process given socket.
+ * Process given socket. Called in non-comet mode, typically keep alive
+ * or upgraded protocol.
*/
- protected boolean processSocket(long socket) {
+ public boolean processSocket(long socket, SocketStatus status) {
try {
Executor executor = getExecutor();
if (executor == null) {
@@ -879,8 +820,11 @@ public class AprEndpoint extends Abstrac
Long.valueOf(socket), null));
} else {
SocketWrapper<Long> wrapper =
- new SocketWrapper<Long>(Long.valueOf(socket));
- executor.execute(new SocketProcessor(wrapper, null));
+ connections.get(Long.valueOf(socket));
+ // Make sure connection hasn't been closed
+ if (wrapper != null) {
+ executor.execute(new SocketProcessor(wrapper, status));
+ }
}
} catch (RejectedExecutionException x) {
log.warn("Socket processing request was rejected for:"+socket,x);
@@ -896,33 +840,6 @@ public class AprEndpoint extends Abstrac
}
- /**
- * Process given socket for an event.
- */
- public boolean processSocket(long socket, SocketStatus status) {
- try {
- Executor executor = getExecutor();
- if (executor == null) {
- log.warn(sm.getString("endpoint.warn.noExector",
- Long.valueOf(socket), status));
- } else {
- SocketWrapper<Long> wrapper =
- new SocketWrapper<Long>(Long.valueOf(socket));
- executor.execute(new SocketEventProcessor(wrapper, status));
- }
- } catch (RejectedExecutionException x) {
- log.warn("Socket processing request was rejected for:"+socket,x);
- return false;
- } catch (Throwable t) {
- ExceptionUtils.handleThrowable(t);
- // This means we got an OOM or similar creating a thread, or that
- // the pool and its queue are full
- log.error(sm.getString("endpoint.process.fail"), t);
- return false;
- }
- return true;
- }
-
public boolean processSocketAsync(SocketWrapper<Long> socket,
SocketStatus status) {
try {
@@ -980,10 +897,20 @@ public class AprEndpoint extends Abstrac
// countDownConnection(). Once the connector is stopped, the latch is
// removed so it does not matter that destroySocket() does not call
// countDownConnection() in that case
+ connections.remove(Long.valueOf(socket));
destroySocket(socket, running);
}
private void destroySocket(long socket, boolean doIt) {
+ if (log.isDebugEnabled()) {
+ String msg = sm.getString("endpoint.debug.destroySocket",
+ Long.valueOf(socket), Boolean.valueOf(doIt));
+ if (log.isTraceEnabled()) {
+ log.trace(msg, new Exception());
+ } else {
+ log.debug(msg);
+ }
+ }
// Be VERY careful if you call this method directly. If it is called
// twice for the same socket the JVM will core. Currently this is only
// called from Poller.closePollset() to ensure kept alive connections
@@ -999,7 +926,6 @@ public class AprEndpoint extends Abstrac
return log;
}
-
// --------------------------------------------------- Acceptor Inner Class
/**
* The background thread that listens for incoming TCP/IP connections and
@@ -1092,6 +1018,9 @@ public class AprEndpoint extends Abstrac
* Async timeout thread
*/
protected class AsyncTimeout implements Runnable {
+
+ private volatile boolean asyncTimeoutRunning = true;
+
/**
* The background thread that checks async requests and fires the
* timeout if there has been no activity.
@@ -1100,7 +1029,7 @@ public class AprEndpoint extends Abstrac
public void run() {
// Loop until we receive a shutdown command
- while (running) {
+ while (asyncTimeoutRunning) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
@@ -1121,7 +1050,7 @@ public class AprEndpoint extends Abstrac
}
// Loop if endpoint is paused
- while (paused && running) {
+ while (paused && asyncTimeoutRunning) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
@@ -1131,106 +1060,346 @@ public class AprEndpoint extends Abstrac
}
}
+
+
+ protected void stop() {
+ asyncTimeoutRunning = false;
+ }
}
- // ----------------------------------------------------- Poller Inner Class
- /**
- * Poller class.
- */
- public class Poller extends Thread {
+ // -------------------------------------------------- SocketInfo Inner Class
+
+ public static class SocketInfo {
+ public long socket;
+ public int timeout;
+ public int flags;
+ public boolean read() {
+ return (flags & Poll.APR_POLLIN) == Poll.APR_POLLIN;
+ }
+ public boolean write() {
+ return (flags & Poll.APR_POLLOUT) == Poll.APR_POLLOUT;
+ }
+ public static int merge(int flag1, int flag2) {
+ return ((flag1 & Poll.APR_POLLIN) | (flag2 & Poll.APR_POLLIN))
+ | ((flag1 & Poll.APR_POLLOUT) | (flag2 & Poll.APR_POLLOUT));
+ }
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Socket: [");
+ sb.append(socket);
+ sb.append("], timeout: [");
+ sb.append(timeout);
+ sb.append("], flags: [");
+ sb.append(flags);
+ return sb.toString();
+ }
+ }
+
+
+ // ---------------------------------------------- SocketTimeouts Inner Class
+
+ public class SocketTimeouts {
+ protected int size;
+
+ protected long[] sockets;
+ protected long[] timeouts;
+ protected int pos = 0;
+
+ public SocketTimeouts(int size) {
+ this.size = 0;
+ sockets = new long[size];
+ timeouts = new long[size];
+ }
+
+ public void add(long socket, long timeout) {
+ sockets[size] = socket;
+ timeouts[size] = timeout;
+ size++;
+ }
+
+ public boolean remove(long socket) {
+ for (int i = 0; i < size; i++) {
+ if (sockets[i] == socket) {
+ sockets[i] = sockets[size - 1];
+ timeouts[i] = timeouts[size - 1];
+ size--;
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public long check(long date) {
+ while (pos < size) {
+ if (date >= timeouts[pos]) {
+ long result = sockets[pos];
+ sockets[pos] = sockets[size - 1];
+ timeouts[pos] = timeouts[size - 1];
+ size--;
+ return result;
+ }
+ pos++;
+ }
+ pos = 0;
+ return 0;
+ }
+
+ }
+
- public static final int FLAGS_READ = Poll.APR_POLLIN;
- public static final int FLAGS_WRITE = Poll.APR_POLLOUT;
+ // -------------------------------------------------- SocketList Inner Class
- // Need two pollsets since the socketTimeout and the keep-alive timeout
- // can have different values.
- private long connectionPollset = 0;
- private long pool = 0;
- private long[] desc;
+ public class SocketList {
+ protected int size;
+ protected int pos;
- private long[] addSocket;
- private int[] addSocketTimeout;
- private int[] addSocketFlags;
+ protected long[] sockets;
+ protected int[] timeouts;
+ protected int[] flags;
- private volatile int addCount = 0;
+ protected SocketInfo info = new SocketInfo();
- private boolean comet = true;
+ public SocketList(int size) {
+ this.size = 0;
+ pos = 0;
+ sockets = new long[size];
+ timeouts = new int[size];
+ flags = new int[size];
+ }
- protected volatile int keepAliveCount = 0;
- public int getKeepAliveCount() { return keepAliveCount; }
+ public int size() {
+ return this.size;
+ }
+
+ public SocketInfo get() {
+ if (pos == size) {
+ return null;
+ } else {
+ info.socket = sockets[pos];
+ info.timeout = timeouts[pos];
+ info.flags = flags[pos];
+ pos++;
+ return info;
+ }
+ }
+
+ public void clear() {
+ size = 0;
+ pos = 0;
+ }
+
+ public boolean add(long socket, int timeout, int flag) {
+ if (size == sockets.length) {
+ return false;
+ } else {
+ for (int i = 0; i < size; i++) {
+ if (sockets[i] == socket) {
+ flags[i] = SocketInfo.merge(flags[i], flag);
+ return true;
+ }
+ }
+ sockets[size] = socket;
+ timeouts[size] = timeout;
+ flags[size] = flag;
+ size++;
+ return true;
+ }
+ }
- public Poller(boolean comet) {
- this.comet = comet;
+ public void duplicate(SocketList copy) {
+ copy.size = size;
+ copy.pos = pos;
+ System.arraycopy(sockets, 0, copy.sockets, 0, size);
+ System.arraycopy(timeouts, 0, copy.timeouts, 0, size);
+ System.arraycopy(flags, 0, copy.flags, 0, size);
}
+ }
+
+ // ------------------------------------------------------ Poller Inner Class
+
+ public class Poller implements Runnable {
+
+ /**
+ * Pointers to the pollers.
+ */
+ protected long[] pollers = null;
+
+ /**
+ * Actual poller size.
+ */
+ protected int actualPollerSize = 0;
+
+ /**
+ * Amount of spots left in the poller.
+ */
+ protected int[] pollerSpace = null;
+
+ /**
+ * Amount of low level pollers in use by this poller.
+ */
+ protected int pollerCount;
+
+ /**
+ * Timeout value for the poll call.
+ */
+ protected int pollerTime;
+
+ /**
+ * Root pool.
+ */
+ protected long pool = 0;
+
+ /**
+ * Socket descriptors.
+ */
+ protected long[] desc;
+
+ /**
+ * List of sockets to be added to the poller.
+ */
+ protected SocketList addList = null;
+
+ /**
+ * List of sockets to be added to the poller.
+ */
+ protected SocketList localAddList = null;
+
+ /**
+ * Structure used for storing timeouts.
+ */
+ protected SocketTimeouts timeouts = null;
+
+
+ /**
+ * Last run of maintain. Maintain will run usually every 5s.
+ */
+ protected long lastMaintain = System.currentTimeMillis();
+
+
+ /**
+ * Amount of connections inside this poller.
+ */
+ protected int connectionCount = 0;
+ public int getConnectionCount() { return connectionCount; }
+
+
+ private volatile boolean pollerRunning = true;
+
/**
* Create the poller. With some versions of APR, the maximum poller size
* will be 62 (recompiling APR is necessary to remove this limitation).
*/
protected void init() {
+
pool = Pool.create(serverSockPool);
- int size = getMaxConnections() / pollerThreadCount;
- int socketTimeout = socketProperties.getSoTimeout();
- connectionPollset = allocatePoller(size, pool, socketTimeout);
- if (connectionPollset == 0 && size > 1024) {
- size = 1024;
- connectionPollset = allocatePoller(size, pool, socketTimeout);
+
+ // Single poller by default
+ int defaultPollerSize = getMaxConnections();
+
+ if ((OS.IS_WIN32 || OS.IS_WIN64) && (defaultPollerSize > 1024)) {
+ // The maximum per poller to get reasonable performance is 1024
+ // Adjust poller size so that it won't reach the limit. This is
+ // a limitation of XP / Server 2003 that has been fixed in
+ // Vista / Server 2008 onwards.
+ actualPollerSize = 1024;
+ } else {
+ actualPollerSize = defaultPollerSize;
}
- if (connectionPollset == 0) {
- size = 62;
- connectionPollset = allocatePoller(size, pool, socketTimeout);
+
+ timeouts = new SocketTimeouts(defaultPollerSize);
+
+ // At the moment, setting the timeout is useless, but it could get
+ // used again as the normal poller could be faster using maintain.
+ // It might not be worth bothering though.
+ long pollset = allocatePoller(actualPollerSize, pool, -1);
+ if (pollset == 0 && actualPollerSize > 1024) {
+ actualPollerSize = 1024;
+ pollset = allocatePoller(actualPollerSize, pool, -1);
}
- desc = new long[size * 2];
- keepAliveCount = 0;
- addSocket = new long[size];
- addSocketTimeout = new int[size];
- addSocketFlags = new int[size];
- addCount = 0;
+ if (pollset == 0) {
+ actualPollerSize = 62;
+ pollset = allocatePoller(actualPollerSize, pool, -1);
+ }
+
+ pollerCount = defaultPollerSize / actualPollerSize;
+ pollerTime = pollTime / pollerCount;
+
+ pollers = new long[pollerCount];
+ pollers[0] = pollset;
+ for (int i = 1; i < pollerCount; i++) {
+ pollers[i] = allocatePoller(actualPollerSize, pool, -1);
+ }
+
+ pollerSpace = new int[pollerCount];
+ for (int i = 0; i < pollerCount; i++) {
+ pollerSpace[i] = actualPollerSize;
+ }
+
+ desc = new long[actualPollerSize * 2];
+ connectionCount = 0;
+ addList = new SocketList(defaultPollerSize);
+ localAddList = new SocketList(defaultPollerSize);
}
+
+ /*
+ * This method is synchronized so that it is not possible for a socket
+ * to be added to the Poller's addList once this method has completed.
+ */
+ protected synchronized void stop() {
+ pollerRunning = false;
+ }
+
+
/**
* Destroy the poller.
*/
- @Override
- public void destroy() {
- // Close all sockets in the add queue
- for (int i = 0; i < addCount; i++) {
- if (comet) {
- processSocket(addSocket[i], SocketStatus.STOP);
- } else {
- destroySocket(addSocket[i]);
- }
- }
- // Close all sockets still in the poller
- closePollset(connectionPollset);
- Pool.destroy(pool);
- keepAliveCount = 0;
- addCount = 0;
+ protected void destroy() {
+ // Wait for pollerTime before doing anything, so that the poller
+ // threads exit, otherwise parallel destruction of sockets which are
+ // still in the poller can cause problems
try {
- while (this.isAlive()) {
- this.interrupt();
- this.join(1000);
+ synchronized (this) {
+ this.notify();
+ this.wait(pollTime / 1000);
}
} catch (InterruptedException e) {
// Ignore
}
- }
-
- private void closePollset(long pollset) {
- int rv = Poll.pollset(pollset, desc);
- if (rv > 0) {
- for (int n = 0; n < rv; n++) {
- if (comet) {
- processSocket(desc[n*2+1], SocketStatus.STOP);
- } else {
- destroySocket(desc[n*2+1], true);
+ // Close all sockets in the add queue
+ SocketInfo info = addList.get();
+ while (info != null) {
+ boolean comet =
+ connections.get(Long.valueOf(info.socket)).isComet();
+ if (!comet || (comet && !processSocket(
+ info.socket, SocketStatus.STOP))) {
+ destroySocket(info.socket);
+ }
+ info = addList.get();
+ }
+ addList.clear();
+ // Close all sockets still in the poller
+ for (int i = 0; i < pollerCount; i++) {
+ int rv = Poll.pollset(pollers[i], desc);
+ if (rv > 0) {
+ for (int n = 0; n < rv; n++) {
+ boolean comet = connections.get(
+ Long.valueOf(desc[n*2+1])).isComet();
+ if (!comet || (comet && !processSocket(
+ desc[n*2+1], SocketStatus.STOP))) {
+ destroySocket(desc[n*2+1], true);
+ }
}
}
}
+ Pool.destroy(pool);
+ connectionCount = 0;
}
-
+
/**
* Add specified socket and associated pool to the poller. The socket
* will be added to a temporary array, and polled first after a maximum
@@ -1251,120 +1420,354 @@ public class AprEndpoint extends Abstrac
(write ? Poll.APR_POLLOUT : 0));
}
- /**
- * @deprecated Use {@link #add(long, int, boolean, boolean)}. This
- * method will be made private in Tomcat 8
- */
- @Deprecated
- public void add(long socket, int timeout, int flags) {
+ private void add(long socket, int timeout, int flags) {
+ if (log.isDebugEnabled()) {
+ String msg = sm.getString("endpoint.debug.pollerAdd",
+ Long.valueOf(socket), Integer.valueOf(timeout),
+ Integer.valueOf(flags));
+ if (log.isTraceEnabled()) {
+ log.trace(msg, new Exception());
+ } else {
+ log.debug(msg);
+ }
+ }
+ if (timeout <= 0) {
+ // Always put a timeout in
+ timeout = Integer.MAX_VALUE;
+ }
+ boolean ok = false;
synchronized (this) {
// Add socket to the list. Newly added sockets will wait
- // at most for pollTime before being polled
- if (addCount >= addSocket.length) {
- // Can't do anything: close the socket right away
- if (comet) {
- processSocket(socket, SocketStatus.ERROR);
- } else {
- destroySocket(socket);
+ // at most for pollTime before being polled. Don't add the
+ // socket once the poller has stopped but destroy it straight
+ // away
+ if (pollerRunning && addList.add(socket, timeout, flags)) {
+ ok = true;
+ this.notify();
+ }
+ }
+ if (!ok) {
+ // Can't do anything: close the socket right away
+ boolean comet = connections.get(
+ Long.valueOf(socket)).isComet();
+ if (!comet || (comet && !processSocket(
+ socket, SocketStatus.ERROR))) {
+ destroySocket(socket);
+ }
+ }
+ }
+
+ /**
+ * Add specified socket to one of the pollers.
+ */
+ protected boolean addToPoller(long socket, int events) {
+ int rv = -1;
+ for (int i = 0; i < pollers.length; i++) {
+ if (pollerSpace[i] > 0) {
+ rv = Poll.add(pollers[i], socket, events);
+ if (rv == Status.APR_SUCCESS) {
+ pollerSpace[i]--;
+ connectionCount++;
+ return true;
}
- return;
}
- addSocket[addCount] = socket;
- addSocketTimeout[addCount] = timeout;
- addSocketFlags[addCount] = flags;
- addCount++;
- this.notify();
}
+ return false;
+ }
+
+ /**
+ * Remove specified socket from the pollers.
+ */
+ protected boolean removeFromPoller(long socket) {
+ int rv = -1;
+ for (int i = 0; i < pollers.length; i++) {
+ if (pollerSpace[i] < actualPollerSize) {
+ rv = Poll.remove(pollers[i], socket);
+ if (rv != Status.APR_NOTFOUND) {
+ pollerSpace[i]++;
+ connectionCount--;
+ break;
+ }
+ }
+ }
+ return (rv == Status.APR_SUCCESS);
+ }
+
+ /**
+ * Timeout checks.
+ */
+ protected void maintain() {
+
+ long date = System.currentTimeMillis();
+ // Maintain runs at most once every 5s, although it will likely get
+ // called more
+ if ((date - lastMaintain) < 5000L) {
+ return;
+ } else {
+ lastMaintain = date;
+ }
+ long socket = timeouts.check(date);
+ while (socket != 0) {
+ if (log.isDebugEnabled()) {
+ log.debug(sm.getString("endpoint.debug.socketTimeout",
+ Long.valueOf(socket)));
+ }
+ removeFromPoller(socket);
+ boolean comet = connections.get(
+ Long.valueOf(socket)).isComet();
+ if (!comet || (comet && !processSocket(
+ socket, SocketStatus.TIMEOUT))) {
+ destroySocket(socket);
+ }
+ socket = timeouts.check(date);
+ }
+
+ }
+
+ /**
+ * Displays the list of sockets in the pollers.
+ */
+ @Override
+ public String toString() {
+ StringBuffer buf = new StringBuffer();
+ buf.append("Poller");
+ long[] res = new long[actualPollerSize * 2];
+ for (int i = 0; i < pollers.length; i++) {
+ int count = Poll.pollset(pollers[i], res);
+ buf.append(" [ ");
+ for (int j = 0; j < count; j++) {
+ buf.append(desc[2*j+1]).append(" ");
+ }
+ buf.append("]");
+ }
+ return buf.toString();
}
/**
- * The background thread that listens for incoming TCP/IP connections and
- * hands them off to an appropriate processor.
+ * The background thread that listens for incoming TCP/IP connections
+ * and hands them off to an appropriate processor.
*/
@Override
public void run() {
- long maintainTime = 0;
+ int maintain = 0;
// Loop until we receive a shutdown command
- while (running) {
+ while (pollerRunning) {
+
// Loop if endpoint is paused
- while (paused && running) {
+ while (paused) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore
}
}
-
- if (!running) {
- break;
- }
- if (keepAliveCount < 1 && addCount < 1) {
- synchronized (this) {
- while (keepAliveCount < 1 && addCount < 1 && running) {
- // Reset maintain time.
- maintainTime = 0;
- try {
- this.wait();
- } catch (InterruptedException e) {
- // Ignore
- }
+ // Check timeouts if the poller is empty
+ while (pollerRunning && connectionCount < 1 &&
+ addList.size() < 1) {
+ // Reset maintain time.
+ try {
+ if (getSoTimeout() > 0 && pollerRunning) {
+ maintain();
}
+ synchronized (this) {
+ this.wait(10000);
+ }
+ } catch (InterruptedException e) {
+ // Ignore
+ } catch (Throwable t) {
+ ExceptionUtils.handleThrowable(t);
+ getLog().warn(sm.getString("endpoint.timeout.err"));
}
}
- if (!running) {
- break;
- }
try {
// Add sockets which are waiting to the poller
- if (addCount > 0) {
+ if (addList.size() > 0) {
synchronized (this) {
- int successCount = 0;
- try {
- for (int i = (addCount - 1); i >= 0; i--) {
- int timeout = addSocketTimeout[i];
- if (timeout > 0) {
- // Convert milliseconds to microseconds
- timeout = timeout * 1000;
+ // Duplicate to another list, so that the syncing is
+ // minimal
+ addList.duplicate(localAddList);
+ addList.clear();
+ }
+ SocketInfo info = localAddList.get();
+ while (info != null) {
+ if (log.isDebugEnabled()) {
+ log.debug(sm.getString(
+ "endpoint.debug.pollerAddDo",
+ Long.valueOf(info.socket)));
+ }
+ timeouts.remove(info.socket);
+ AprSocketWrapper wrapper = connections.get(
+ Long.valueOf(info.socket));
+ if (wrapper == null) {
+ continue;
+ }
+ if (info.read() || info.write()) {
+ boolean comet = wrapper.isComet();
+ if (comet || wrapper.pollerFlags != 0) {
+ removeFromPoller(info.socket);
+ }
+ wrapper.pollerFlags = wrapper.pollerFlags |
+ (info.read() ? Poll.APR_POLLIN : 0) |
+ (info.write() ? Poll.APR_POLLOUT : 0);
+ if (!addToPoller(info.socket, wrapper.pollerFlags)) {
+ // Can't do anything: close the socket right
+ // away
+ if (!comet || (comet && !processSocket(
+ info.socket, SocketStatus.ERROR))) {
+ destroySocket(info.socket);
}
- int rv = Poll.addWithTimeout(
- connectionPollset, addSocket[i],
- addSocketFlags[i], timeout);
- if (rv == Status.APR_SUCCESS) {
- successCount++;
+ } else {
+ timeouts.add(info.socket,
+ System.currentTimeMillis() +
+ info.timeout);
+ }
+ } else {
+ // Should never happen.
+ destroySocket(info.socket);
+ getLog().warn(sm.getString(
+ "endpoint.apr.pollAddInvalid", info));
+ }
+ info = localAddList.get();
+ }
+ }
+
+ // Poll for the specified interval
+ for (int i = 0; i < pollers.length; i++) {
+
+ // Flags to ask to reallocate the pool
+ boolean reset = false;
+ //ArrayList<Long> skip = null;
+
+ int rv = 0;
+ // Iterate on each pollers, but no need to poll empty pollers
+ if (pollerSpace[i] < actualPollerSize) {
+ rv = Poll.poll(pollers[i], pollerTime, desc, true);
+ }
+ if (rv > 0) {
+ pollerSpace[i] += rv;
+ connectionCount -= rv;
+ for (int n = 0; n < rv; n++) {
+ timeouts.remove(desc[n*2+1]);
+ AprSocketWrapper wrapper = connections.get(
+ Long.valueOf(desc[n*2+1]));
+ if (getLog().isDebugEnabled()) {
+ log.debug(sm.getString(
+ "endpoint.debug.pollerProcess",
+ Long.valueOf(desc[n*2+1]),
+ Long.valueOf(desc[n*2])));
+ }
+ wrapper.pollerFlags = wrapper.pollerFlags & ~((int) desc[n*2]);
+ // Check for failed sockets and hand this socket off to a worker
+ if (wrapper.isComet()) {
+ // Event processes either a read or a write depending on what the poller returns
+ if (((desc[n*2] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP)
+ || ((desc[n*2] & Poll.APR_POLLERR) == Poll.APR_POLLERR)
+ || ((desc[n*2] & Poll.APR_POLLNVAL) == Poll.APR_POLLNVAL)) {
+ if (!processSocket(desc[n*2+1], SocketStatus.ERROR)) {
+ // Close socket and clear pool
+ destroySocket(desc[n*2+1]);
+ }
+ } else if ((desc[n*2] & Poll.APR_POLLIN) == Poll.APR_POLLIN) {
+ if (wrapper.pollerFlags != 0) {
+ add(desc[n*2+1], 1, wrapper.pollerFlags);
+ }
+ if (!processSocket(desc[n*2+1], SocketStatus.OPEN_READ)) {
+ // Close socket and clear pool
+ destroySocket(desc[n*2+1]);
+ }
+ } else if ((desc[n*2] & Poll.APR_POLLOUT) == Poll.APR_POLLOUT) {
+ if (wrapper.pollerFlags != 0) {
+ add(desc[n*2+1], 1, wrapper.pollerFlags);
+ }
+ if (!processSocket(desc[n*2+1], SocketStatus.OPEN_WRITE)) {
+ // Close socket and clear pool
+ destroySocket(desc[n*2+1]);
+ }
} else {
- // Can't do anything: close the socket right away
- if (comet) {
- processSocket(addSocket[i], SocketStatus.ERROR);
- } else {
- destroySocket(addSocket[i]);
+ // Unknown event
+ getLog().warn(sm.getString(
+ "endpoint.apr.pollUnknownEvent",
+ Long.valueOf(desc[n*2])));
+ if (!processSocket(desc[n*2+1], SocketStatus.ERROR)) {
+ // Close socket and clear pool
+ destroySocket(desc[n*2+1]);
}
}
+ } else if (((desc[n*2] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP)
+ || ((desc[n*2] & Poll.APR_POLLERR) == Poll.APR_POLLERR)
+ || ((desc[n*2] & Poll.APR_POLLNVAL) == Poll.APR_POLLNVAL)) {
+ // Close socket and clear pool
+ destroySocket(desc[n*2+1]);
+ } else if (((desc[n*2] & Poll.APR_POLLIN) == Poll.APR_POLLIN)
+ || ((desc[n*2] & Poll.APR_POLLOUT) == Poll.APR_POLLOUT)) {
+ boolean error = false;
+ if (((desc[n*2] & Poll.APR_POLLIN) == Poll.APR_POLLIN) &&
+ !processSocket(desc[n*2+1], SocketStatus.OPEN_READ)) {
+ error = true;
+ // Close socket and clear pool
+ destroySocket(desc[n*2+1]);
+ }
+ if (!error &&
+ ((desc[n*2] & Poll.APR_POLLOUT) == Poll.APR_POLLOUT) &&
+ !processSocket(desc[n*2+1], SocketStatus.OPEN_WRITE)) {
+ // Close socket and clear pool
+ destroySocket(desc[n*2+1]);
+ }
+ } else {
+ // Unknown event
+ getLog().warn(sm.getString(
+ "endpoint.apr.pollUnknownEvent",
+ Long.valueOf(desc[n*2])));
+ // Close socket and clear pool
+ destroySocket(desc[n*2+1]);
+ }
+ }
+ } else if (rv < 0) {
+ int errn = -rv;
+ // Any non timeup or interrupted error is critical
+ if ((errn != Status.TIMEUP) && (errn != Status.EINTR)) {
+ if (errn > Status.APR_OS_START_USERERR) {
+ errn -= Status.APR_OS_START_USERERR;
}
- } finally {
- keepAliveCount += successCount;
- addCount = 0;
+ getLog().error(sm.getString(
+ "endpoint.apr.pollError",
+ Integer.valueOf(errn),
+ Error.strerror(errn)));
+ // Destroy and reallocate the poller
+ reset = true;
}
}
- }
- maintainTime += pollTime;
- // Poll for the specified interval
- if (doPoll(connectionPollset)) {
- continue;
+ if (reset) {
+ // Reallocate the current poller
+ int count = Poll.pollset(pollers[i], desc);
+ long newPoller = allocatePoller(actualPollerSize, pool, -1);
+ // Don't restore connections for now, since I have not tested it
+ pollerSpace[i] = actualPollerSize;
+ connectionCount -= count;
+ Poll.destroy(pollers[i]);
+ pollers[i] = newPoller;
+ }
+
}
- // Check timeouts (much less frequently that polling)
- if (maintainTime > 1000000L && running) {
- maintainTime = 0;
- if (socketProperties.getSoTimeout() > 0) {
- doTimeout(connectionPollset);
- }
+ // Process socket timeouts
+ if (getSoTimeout() > 0 && maintain++ > 1000 && pollerRunning) {
+ // This works and uses only one timeout mechanism for everything, but the
+ // non event poller might be a bit faster by using the old maintain.
+ maintain = 0;
+ maintain();
}
+
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
- log.error(sm.getString("endpoint.poll.error"), t);
+ if (maintain == 0) {
+ getLog().warn(sm.getString("endpoint.timeout.error"), t);
+ } else {
+ getLog().warn(sm.getString("endpoint.poll.error"), t);
+ }
}
}
@@ -1372,59 +1775,6 @@ public class AprEndpoint extends Abstrac
synchronized (this) {
this.notifyAll();
}
-
- }
-
- private boolean doPoll(long pollset) {
- int rv = Poll.poll(pollset, pollTime, desc, true);
- if (rv > 0) {
- keepAliveCount -= rv;
- for (int n = 0; n < rv; n++) {
- // Check for failed sockets and hand this socket off to a worker
- if (((desc[n*2] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP)
- || ((desc[n*2] & Poll.APR_POLLERR) == Poll.APR_POLLERR)
- || (comet && (!processSocket(desc[n*2+1], SocketStatus.OPEN_READ)))
- || (!comet && (!processSocket(desc[n*2+1])))) {
- // Close socket and clear pool
- if (comet) {
- processSocket(desc[n*2+1], SocketStatus.DISCONNECT);
- } else {
- destroySocket(desc[n*2+1]);
- }
- }
- }
- } else if (rv < 0) {
- int errn = -rv;
- /* Any non timeup or interrupted error is critical */
- if ((errn != Status.TIMEUP) && (errn != Status.EINTR)) {
- if (errn > Status.APR_OS_START_USERERR) {
- errn -= Status.APR_OS_START_USERERR;
- }
- log.error(sm.getString("endpoint.poll.fail", "" + errn, Error.strerror(errn)));
- // Handle poll critical failure
- synchronized (this) {
- destroy();
- init();
- }
- return true;
- }
- }
- return false;
- }
-
- private void doTimeout(long pollset) {
- int rv = Poll.maintain(pollset, desc, true);
- if (rv > 0) {
- keepAliveCount -= rv;
- for (int n = 0; n < rv; n++) {
- // Close socket and clear pool
- if (comet) {
- processSocket(desc[n], SocketStatus.TIMEOUT);
- } else {
- destroySocket(desc[n]);
- }
- }
- }
}
}
@@ -1455,56 +1805,65 @@ public class AprEndpoint extends Abstrac
// --------------------------------------------------- Sendfile Inner Class
- /**
- * Sendfile class.
- */
- public class Sendfile extends Thread {
+ public class Sendfile implements Runnable {
protected long sendfilePollset = 0;
protected long pool = 0;
protected long[] desc;
protected HashMap<Long, SendfileData> sendfileData;
- protected volatile int sendfileCount;
+ protected int sendfileCount;
public int getSendfileCount() { return sendfileCount; }
protected ArrayList<SendfileData> addS;
- protected volatile int addCount;
+
+ private volatile boolean sendfileRunning = true;
/**
- * Create the sendfile poller. With some versions of APR, the maximum poller size will
- * be 62 (recompiling APR is necessary to remove this limitation).
+ * Create the sendfile poller. With some versions of APR, the maximum
+ * poller size will be 62 (recompiling APR is necessary to remove this
+ * limitation).
*/
protected void init() {
pool = Pool.create(serverSockPool);
- int size = sendfileSize / sendfileThreadCount;
- sendfilePollset = allocatePoller(size, pool, socketProperties.getSoTimeout());
+ int size = sendfileSize;
+ if (size <= 0) {
+ size = (OS.IS_WIN32 || OS.IS_WIN64) ? (1 * 1024) : (16 * 1024);
+ }
+ sendfilePollset = allocatePoller(size, pool, getSoTimeout());
if (sendfilePollset == 0 && size > 1024) {
size = 1024;
- sendfilePollset = allocatePoller(size, pool, socketProperties.getSoTimeout());
+ sendfilePollset = allocatePoller(size, pool, getSoTimeout());
}
if (sendfilePollset == 0) {
size = 62;
- sendfilePollset = allocatePoller(size, pool, socketProperties.getSoTimeout());
+ sendfilePollset = allocatePoller(size, pool, getSoTimeout());
}
desc = new long[size * 2];
sendfileData = new HashMap<Long, SendfileData>(size);
addS = new ArrayList<SendfileData>();
- addCount = 0;
}
/**
* Destroy the poller.
*/
- @Override
- public void destroy() {
+ protected void destroy() {
+ sendfileRunning = false;
+ // Wait for polltime before doing anything, so that the poller threads
+ // exit, otherwise parallel destruction of sockets which are still
+ // in the poller can cause problems
+ try {
+ synchronized (this) {
+ this.wait(pollTime / 1000);
+ }
+ } catch (InterruptedException e) {
+ // Ignore
+ }
// Close any socket remaining in the add queue
- addCount = 0;
for (int i = (addS.size() - 1); i >= 0; i--) {
SendfileData data = addS.get(i);
destroySocket(data.socket);
}
- addS.clear();
// Close all sockets still in the poller
int rv = Poll.pollset(sendfilePollset, desc);
if (rv > 0) {
@@ -1514,14 +1873,6 @@ public class AprEndpoint extends Abstrac
}
Pool.destroy(pool);
sendfileData.clear();
- try {
- while (this.isAlive()) {
- this.interrupt();
- this.join(1000);
- }
- } catch (InterruptedException e) {
- // Ignore
- }
}
/**
@@ -1530,7 +1881,7 @@ public class AprEndpoint extends Abstrac
* will be handled asynchronously inside the kernel. As a result,
* the poller will never be used.
*
- * @param data containing the reference to the data which should be sent
+ * @param data containing the reference to the data which should be snet
* @return true if all the data has been sent right away, and false
* otherwise
*/
@@ -1538,13 +1889,6 @@ public class AprEndpoint extends Abstrac
// Initialize fd from data given
try {
data.fdpool = Socket.pool(data.socket);
- } catch (Exception e) {
- // Pool not created so no need to destroy it.
- log.error(sm.getString("endpoint.sendfile.error"), e);
- data.socket = 0;
- return false;
- }
- try {
data.fd = File.open
(data.fileName, File.APR_FOPEN_READ
| File.APR_FOPEN_SENDFILE_ENABLED | File.APR_FOPEN_BINARY,
@@ -1558,36 +1902,32 @@ public class AprEndpoint extends Abstrac
if (nw < 0) {
if (!(-nw == Status.EAGAIN)) {
Pool.destroy(data.fdpool);
- // No need to close socket, this will be done by
- // calling code since data.socket == 0
data.socket = 0;
return false;
} else {
// Break the loop and add the socket to poller.
break;
}
- }
-
- data.pos = data.pos + nw;
- if (data.pos >= data.end) {
- // Entire file has been sent
- Pool.destroy(data.fdpool);
- // Set back socket to blocking mode
- Socket.timeoutSet(data.socket, socketProperties.getSoTimeout() * 1000);
- return true;
+ } else {
+ data.pos = data.pos + nw;
+ if (data.pos >= data.end) {
+ // Entire file has been sent
+ Pool.destroy(data.fdpool);
+ // Set back socket to blocking mode
+ Socket.timeoutSet(
+ data.socket, getSoTimeout() * 1000);
+ return true;
+ }
}
}
} catch (Exception e) {
- log.error(sm.getString("endpoint.sendfile.error"), e);
- Pool.destroy(data.fdpool);
- data.socket = 0;
+ log.warn(sm.getString("endpoint.sendfile.error"), e);
return false;
}
// Add socket to the list. Newly added sockets will wait
// at most for pollTime before being polled
synchronized (this) {
addS.add(data);
- addCount++;
this.notify();
}
return false;
@@ -1603,72 +1943,61 @@ public class AprEndpoint extends Abstrac
if (rv == Status.APR_SUCCESS) {
sendfileCount--;
}
- sendfileData.remove(Long.valueOf(data.socket));
+ sendfileData.remove(new Long(data.socket));
}
/**
- * The background thread that listens for incoming TCP/IP connections and
- * hands them off to an appropriate processor.
+ * The background thread that listens for incoming TCP/IP connections
+ * and hands them off to an appropriate processor.
*/
@Override
public void run() {
long maintainTime = 0;
// Loop until we receive a shutdown command
- while (running) {
+ while (sendfileRunning) {
// Loop if endpoint is paused
- while (paused && running) {
+ while (paused) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore
}
}
-
- if (!running) {
- break;
- }
- if (sendfileCount < 1 && addCount < 1) {
- synchronized (this) {
- while (sendfileCount < 1 && addS.size() < 1 && running) {
- // Reset maintain time.
- maintainTime = 0;
- try {
- this.wait();
- } catch (InterruptedException e) {
- // Ignore
- }
+ // Loop if poller is empty
+ while (sendfileCount < 1 && addS.size() < 1) {
+ // Reset maintain time.
+ maintainTime = 0;
+ try {
+ synchronized (this) {
+ this.wait();
}
+ } catch (InterruptedException e) {
+ // Ignore
}
}
- if (!running) {
- break;
- }
try {
// Add socket to the poller
- if (addCount > 0) {
+ if (addS.size() > 0) {
synchronized (this) {
- int successCount = 0;
- try {
- for (int i = (addS.size() - 1); i >= 0; i--) {
- SendfileData data = addS.get(i);
- int rv = Poll.add(sendfilePollset, data.socket, Poll.APR_POLLOUT);
- if (rv == Status.APR_SUCCESS) {
- sendfileData.put(Long.valueOf(data.socket), data);
- successCount++;
- } else {
- log.warn(sm.getString("endpoint.sendfile.addfail", "" + rv, Error.strerror(rv)));
- // Can't do anything: close the socket right away
- destroySocket(data.socket);
- }
+ for (int i = (addS.size() - 1); i >= 0; i--) {
+ SendfileData data = addS.get(i);
+ int rv = Poll.add(sendfilePollset, data.socket, Poll.APR_POLLOUT);
+ if (rv == Status.APR_SUCCESS) {
+ sendfileData.put(new Long(data.socket), data);
+ sendfileCount++;
+ } else {
+ getLog().warn(sm.getString(
+ "endpoint.sendfile.addfail",
+ Integer.valueOf(rv),
+ Error.strerror(rv)));
+ // Can't do anything: close the socket right away
+ destroySocket(data.socket);
}
- } finally {
- sendfileCount += successCount;
- addS.clear();
- addCount = 0;
}
+ addS.clear();
}
}
@@ -1679,7 +2008,7 @@ public class AprEndpoint extends Abstrac
for (int n = 0; n < rv; n++) {
// Get the sendfile state
SendfileData state =
- sendfileData.get(Long.valueOf(desc[n*2+1]));
+ sendfileData.get(new Long(desc[n*2+1]));
// Problem events
if (((desc[n*2] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP)
|| ((desc[n*2] & Poll.APR_POLLERR) == Poll.APR_POLLERR)) {
@@ -1709,12 +2038,13 @@ public class AprEndpoint extends Abstrac
if (state.keepAlive) {
// Destroy file descriptor pool, which should close the file
Pool.destroy(state.fdpool);
- Socket.timeoutSet(state.socket, socketProperties.getSoTimeout() * 1000);
- // If all done put the socket back in the poller for
- // processing of further requests
- getPoller().add(state.socket,
- getKeepAliveTimeout(),
- Poller.FLAGS_READ);
+ Socket.timeoutSet(state.socket,
+ getSoTimeout() * 1000);
+ // If all done put the socket back in the
+ // poller for processing of further requests
+ getPoller().add(
+ state.socket, getKeepAliveTimeout(),
+ true, false);
} else {
// Close the socket since this is
// the end of not keep-alive request.
@@ -1729,7 +2059,10 @@ public class AprEndpoint extends Abstrac
if (errn > Status.APR_OS_START_USERERR) {
errn -= Status.APR_OS_START_USERERR;
}
- log.error(sm.getString("endpoint.poll.fail", "" + errn, Error.strerror(errn)));
+ getLog().error(sm.getString(
+ "Unexpected poller error",
+ Integer.valueOf(errn),
+ Error.strerror(errn)));
// Handle poll critical failure
synchronized (this) {
destroy();
@@ -1739,13 +2072,14 @@ public class AprEndpoint extends Abstrac
}
}
// Call maintain for the sendfile poller
- if (socketProperties.getSoTimeout() > 0 && maintainTime > 1000000L && running) {
- rv = Poll.maintain(sendfilePollset, desc, true);
+ if (getSoTimeout() > 0 &&
+ maintainTime > 1000000L && sendfileRunning) {
+ rv = Poll.maintain(sendfilePollset, desc, false);
maintainTime = 0;
if (rv > 0) {
for (int n = 0; n < rv; n++) {
// Get the sendfile state
- SendfileData state = sendfileData.get(Long.valueOf(desc[n]));
+ SendfileData state = sendfileData.get(new Long(desc[n]));
// Close socket and clear pool
remove(state);
// Destroy file descriptor pool, which should close the file
@@ -1756,7 +2090,7 @@ public class AprEndpoint extends Abstrac
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
- log.error(sm.getString("endpoint.poll.error"), t);
+ getLog().error(sm.getString("endpoint.poll.error"), t);
}
}
@@ -1768,7 +2102,6 @@ public class AprEndpoint extends Abstrac
}
-
// ------------------------------------------------ Handler Inner Interface
@@ -1789,6 +2122,8 @@ public class AprEndpoint extends Abstrac
* This class is the equivalent of the Worker, but will simply use in an
* external Executor thread pool. This will also set the socket options
* and do the handshake.
+ *
+ * This is called after an accept().
*/
protected class SocketWithOptionsProcessor implements Runnable {
@@ -1806,7 +2141,7 @@ public class AprEndpoint extends Abstrac
if (!deferAccept) {
if (setSocketOptions(socket.getSocket().longValue())) {
getPoller().add(socket.getSocket().longValue(),
- getSoTimeout(), Poller.FLAGS_READ);
+ getSoTimeout(), true, false);
} else {
// Close socket and pool
destroySocket(socket.getSocket().longValue());
@@ -1848,76 +2183,73 @@ public class AprEndpoint extends Abstrac
*/
protected class SocketProcessor implements Runnable {
- protected SocketWrapper<Long> socket = null;
- protected SocketStatus status = null;
+ private final SocketWrapper<Long> socket;
+ private final SocketStatus status;
public SocketProcessor(SocketWrapper<Long> socket,
SocketStatus status) {
this.socket = socket;
+ if (status == null) {
+ // Should never happen
+ throw new NullPointerException();
+ }
this.status = status;
}
@Override
public void run() {
- synchronized (socket) {
- // Process the request from this socket
- SocketState state = SocketState.OPEN;
- if (status == null) {
- state = handler.process(socket,SocketStatus.OPEN_READ);
- } else {
- state = handler.process(socket, status);
+
+ // Upgraded connections need to allow multiple threads to access the
+ // connection at the same time to enable blocking IO to be used when
+ // Servlet 3.1 NIO has been configured
+ if (socket.isUpgraded() && SocketStatus.OPEN_WRITE == status) {
+ synchronized (socket.getWriteThreadLock()) {
+ doRun();
}
- if (state == Handler.SocketState.CLOSED) {
- // Close socket and pool
- destroySocket(socket.getSocket().longValue());
- socket = null;
- } else if (state == Handler.SocketState.LONG) {
- socket.access();
- if (socket.async) {
- waitingRequests.add(socket);
- }
- } else if (state == Handler.SocketState.ASYNC_END) {
- socket.access();
- SocketProcessor proc = new SocketProcessor(socket, SocketStatus.OPEN_READ);
- getExecutor().execute(proc);
+ } else {
+ synchronized (socket) {
+ doRun();
}
}
}
- }
+ private void doRun() {
+ // Process the request from this socket
+ if (socket.getSocket() == null) {
+ // Closed in another thread
+ return;
+ }
+ SocketState state = handler.process(socket, status);
+ if (state == Handler.SocketState.CLOSED) {
+ // Close socket and pool
+ destroySocket(socket.getSocket().longValue());
+ socket.socket = null;
+ } else if (state == Handler.SocketState.LONG) {
+ socket.access();
+ if (socket.async) {
+ waitingRequests.add(socket);
+ }
+ } else if (state == Handler.SocketState.ASYNC_END) {
+ socket.access();
+ SocketProcessor proc = new SocketProcessor(socket,
+ SocketStatus.OPEN_READ);
+ getExecutor().execute(proc);
+ }
+ }
+ }
- // --------------------------------------- SocketEventProcessor Inner Class
+ private static class AprSocketWrapper extends SocketWrapper<Long> {
- /**
- * This class is the equivalent of the Worker, but will simply use in an
- * external Executor thread pool.
- */
- protected class SocketEventProcessor implements Runnable {
+ // This field should only be used by Poller#run()
+ private int pollerFlags = 0;
- protected SocketWrapper<Long> socket = null;
- protected SocketStatus status = null;
-
- public SocketEventProcessor(SocketWrapper<Long> socket,
- SocketStatus status) {
- this.socket = socket;
- this.status = status;
- }
-
- @Override
- public void run() {
- synchronized (socket) {
- // Process the request from this socket
- Handler.SocketState state = handler.process(socket, status);
- if (state == Handler.SocketState.CLOSED) {
- // Close socket and pool
- destroySocket(socket.getSocket().longValue());
- socket = null;
- }
- }
+ public AprSocketWrapper(Long socket) {
+ super(socket);
}
}
+
private static class PrivilegedSetTccl implements PrivilegedAction<Void> {
private ClassLoader cl;
Modified: tomcat/tc7.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=1516628&r1=1516627&r2=1516628&view=diff
==============================================================================
--- tomcat/tc7.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original)
+++ tomcat/tc7.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Thu Aug 22 21:45:54 2013
@@ -905,7 +905,7 @@ public class NioEndpoint extends Abstrac
final KeyAttachment att = (KeyAttachment) key.attachment();
if ( att!=null ) {
//handle callback flag
- if (att.getComet() && (interestOps & OP_CALLBACK) == OP_CALLBACK ) {
+ if (att.isComet() && (interestOps & OP_CALLBACK) == OP_CALLBACK ) {
att.setCometNotify(true);
} else {
att.setCometNotify(false);
@@ -1065,7 +1065,7 @@ public class NioEndpoint extends Abstrac
try {
if ( key == null ) return;//nothing to do
KeyAttachment ka = (KeyAttachment) key.attachment();
- if (ka != null && ka.getComet() && status != null) {
+ if (ka != null && ka.isComet() && status != null) {
//the comet event takes care of clean up
//processSocket(ka.getChannel(), status, dispatch);
ka.setComet(false);//to avoid a loop
@@ -1247,7 +1247,7 @@ public class NioEndpoint extends Abstrac
if (sk.isReadable() || sk.isWritable() ) {
if ( attachment.getSendfileData() != null ) {
processSendfile(sk,attachment, false);
- } else if ( attachment.getComet() ) {
+ } else if ( attachment.isComet() ) {
//check if thread is available
if ( isWorkerAvailable() ) {
//set interest ops to 0 so we don't get multiple
@@ -1449,7 +1449,7 @@ public class NioEndpoint extends Abstrac
cancelledKey(key, SocketStatus.ERROR,false); //we don't support any keys without attachments
} else if ( ka.getError() ) {
cancelledKey(key, SocketStatus.ERROR,true);//TODO this is not yet being used
- } else if (ka.getComet() && ka.getCometNotify() ) {
+ } else if (ka.isComet() && ka.getCometNotify() ) {
ka.setCometNotify(false);
reg(key,ka,0);//avoid multiple calls, this gets reregistered after invocation
//if (!processSocket(ka.getChannel(), SocketStatus.OPEN_CALLBACK)) processSocket(ka.getChannel(), SocketStatus.DISCONNECT);
@@ -1469,7 +1469,7 @@ public class NioEndpoint extends Abstrac
ka.interestOps(0); //avoid duplicate timeout calls
cancelledKey(key, SocketStatus.TIMEOUT,true);
}
- } else if (ka.isAsync() || ka.getComet()) {
+ } else if (ka.isAsync() || ka.isComet()) {
if (close) {
key.interestOps(0);
ka.interestOps(0); //avoid duplicate stop calls
@@ -1514,7 +1514,7 @@ public class NioEndpoint extends Abstrac
this.socket = channel;
this.poller = poller;
lastAccess = System.currentTimeMillis();
- comet = false;
+ setComet(false);
timeout = soTimeout;
setWriteTimeout(soTimeout);
error = false;
@@ -1551,8 +1551,6 @@ public class NioEndpoint extends Abstrac
public Poller getPoller() { return poller;}
public void setPoller(Poller poller){this.poller = poller;}
- public void setComet(boolean comet) { this.comet = comet; }
- public boolean getComet() { return comet; }
public void setCometNotify(boolean notify) { this.cometNotify = notify; }
public boolean getCometNotify() { return cometNotify; }
/**
Modified: tomcat/tc7.0.x/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java
URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java?rev=1516628&r1=1516627&r2=1516628&view=diff
==============================================================================
--- tomcat/tc7.0.x/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java (original)
+++ tomcat/tc7.0.x/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java Thu Aug 22 21:45:54 2013
@@ -29,6 +29,7 @@ public class SocketWrapper<E> {
protected boolean error = false;
protected long lastRegistered = 0;
protected volatile int keepAliveLeft = 100;
+ private boolean comet = false;
protected boolean async = false;
protected boolean keptAlive = false;
private boolean upgraded = false;
@@ -62,6 +63,8 @@ public class SocketWrapper<E> {
return socket;
}
+ public boolean isComet() { return comet; }
+ public void setComet(boolean comet) { this.comet = comet; }
public boolean isAsync() { return async; }
public void setAsync(boolean async) { this.async = async; }
public boolean isUpgraded() { return upgraded; }
Modified: tomcat/tc7.0.x/trunk/java/org/apache/tomcat/util/net/res/LocalStrings.properties
URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/tomcat/util/net/res/LocalStrings.properties?rev=1516628&r1=1516627&r2=1516628&view=diff
==============================================================================
--- tomcat/tc7.0.x/trunk/java/org/apache/tomcat/util/net/res/LocalStrings.properties (original)
+++ tomcat/tc7.0.x/trunk/java/org/apache/tomcat/util/net/res/LocalStrings.properties Thu Aug 22 21:45:54 2013
@@ -14,19 +14,24 @@
# limitations under the License.
# net resources
-endpoint.err.fatal=Endpoint {0} shutdown due to exception: {1}
-endpoint.err.nonfatal=Endpoint {0} ignored exception: {1}
-endpoint.warn.reinit=Reinitializing ServerSocket
-endpoint.warn.restart=Restarting endpoint
-endpoint.warn.security=Endpoint {0} security exception: {1}
-endpoint.err.socket=Socket error caused by remote host {0}
+endpoint.err.close=Caught exception trying to close socket
endpoint.err.handshake=Handshake failed
endpoint.err.unexpected=Unexpected error processing socket
-endpoint.warn.nullSocket=Null socket returned by accept
+endpoint.warn.noExector=Failed to process socket [{0}] in state [{1}] because the executor had already been shutdown
+endpoint.warn.noDisableCompression='Disable compression' option is not supported by the SSL library {0}
+endpoint.warn.noHonorCipherOrder='Honor cipher order' option is not supported by the SSL library {0}
+endpoint.warn.noInsecureReneg=Secure re-negotiation is not supported by the SSL library {0}
+endpoint.warn.unlockAcceptorFailed=Acceptor thread [{0}] failed to unlock. Forcing hard socket shutdown.
+endpoint.debug.channelCloseFail=Failed to close channel
+endpoint.debug.destroySocket=socket [{0}], doIt [{1}]
+endpoint.debug.pollerAdd=socket [{0}], timeout [{1}], flags [{2}]
+endpoint.debug.pollerAddDo=socket [{0}]
+endpoint.debug.pollerProcess=Processing socket [{0}] for event(s) [{1}]
+endpoint.debug.socket=socket [{0}]
+endpoint.debug.socketCloseFail=Failed to close socket
+endpoint.debug.socketTimeout=Timing out [{0}
endpoint.debug.unlock=Caught exception trying to unlock accept on port {0}
endpoint.err.close=Caught exception trying to close socket
-endpoint.noProcessor=No Processors - worker thread dead!
-endpoint.info.maxThreads=Maximum number of threads ({0}) created for connector with address {1} and port {2}
endpoint.init.bind=Socket bind failed: [{0}] {1}
endpoint.init.listen=Socket listen failed: [{0}] {1}
@@ -40,15 +45,11 @@ endpoint.poll.error=Unexpected poller er
endpoint.process.fail=Error allocating socket processor
endpoint.sendfile.error=Unexpected sendfile error
endpoint.sendfile.addfail=Sendfile failure: [{0}] {1}
-endpoint.sendfile.nosupport=Disabling sendfile, since either the APR version or the system doesn't support it
-endpoint.warn.noDisableCompression='Disable compression' option is not supported by the SSL library {0}
-endpoint.warn.noInsecureReneg=Secure re-negotiation is not supported by the SSL library {0}
-endpoint.warn.noHonorCipherOrder='Honor cipher order' option is not supported by the SSL library {0}
-endpoint.warn.unlockAcceptorFailed=Acceptor thread [{0}] failed to unlock. Forcing hard socket shutdown.
-endpoint.warn.noHonorCipherOrder='Honor cipher order' option is not supported by the SSL library {0}
-endpoint.debug.channelCloseFail=Failed to close channel
-endpoint.debug.socketCloseFail=Failed to close socket
+endpoint.timeout.err=Error processing socket timeout
endpoint.apr.noSslCertFile=Connector attribute SSLCertificateFile must be defined when using SSL with APR
+endpoint.apr.pollAddInvalid=Invalid attempted to add a socket [{0}] to the poller
+endpoint.apr.pollError=Poller failed with error [{0}] : [{1}]
+endpoint.apr.pollUnknownEvent=A socket was returned from the poller with an unrecognized event [{0}]
endpoint.apr.invalidSslProtocol=An invalid value [{0}] was provided for the SSLProtocol attribute
endpoint.nio.selectorCloseFail=Failed to close selector when closing the poller
endpoint.warn.noExector=Failed to process socket [{0}] in state [{1}] because the executor had already been shutdown
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org