You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by re...@apache.org on 2016/06/23 14:22:39 UTC
svn commit: r1749898 - in /tomcat/trunk:
java/org/apache/coyote/http2/Http2Protocol.java
java/org/apache/coyote/http2/Http2UpgradeHandler.java
java/org/apache/coyote/http2/StreamProcessor.java webapps/docs/changelog.xml
Author: remm
Date: Thu Jun 23 14:22:39 2016
New Revision: 1749898
URL: http://svn.apache.org/viewvc?rev=1749898&view=rev
Log:
59706: Add stream concurrency control (patch v3), after more testing, etc. It is not enabled by default.
Modified:
tomcat/trunk/java/org/apache/coyote/http2/Http2Protocol.java
tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java
tomcat/trunk/java/org/apache/coyote/http2/StreamProcessor.java
tomcat/trunk/webapps/docs/changelog.xml
Modified: tomcat/trunk/java/org/apache/coyote/http2/Http2Protocol.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Http2Protocol.java?rev=1749898&r1=1749897&r2=1749898&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/Http2Protocol.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http2/Http2Protocol.java Thu Jun 23 14:22:39 2016
@@ -35,6 +35,9 @@ public class Http2Protocol implements Up
static final long DEFAULT_WRITE_TIMEOUT = 10000;
// The HTTP/2 specification recommends a minimum default of 100
static final long DEFAULT_MAX_CONCURRENT_STREAMS = 200;
+ // Maximum amount of streams which can be concurrently executed over
+ // a single connection
+ static final int DEFAULT_MAX_CONCURRENT_STREAM_EXECUTION = 200;
// This default is defined by the HTTP/2 specification
static final int DEFAULT_INITIAL_WINDOW_SIZE = (1 << 16) - 1;
@@ -47,6 +50,7 @@ public class Http2Protocol implements Up
private long keepAliveTimeout = DEFAULT_KEEP_ALIVE_TIMEOUT;
private long writeTimeout = DEFAULT_WRITE_TIMEOUT;
private long maxConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS;
+ private int maxConcurrentStreamExecution = DEFAULT_MAX_CONCURRENT_STREAM_EXECUTION;
// If a lower initial value is required, set it here but DO NOT change the
// default defined above.
private int initialWindowSize = DEFAULT_INITIAL_WINDOW_SIZE;
@@ -87,6 +91,7 @@ public class Http2Protocol implements Up
result.setKeepAliveTimeout(getKeepAliveTimeout());
result.setWriteTimeout(getWriteTimeout());
result.setMaxConcurrentStreams(getMaxConcurrentStreams());
+ result.setMaxConcurrentStreamExecution(getMaxConcurrentStreamExecution());
result.setInitialWindowSize(getInitialWindowSize());
return result;
@@ -155,6 +160,16 @@ public class Http2Protocol implements Up
}
+ public int getMaxConcurrentStreamExecution() {
+ return maxConcurrentStreamExecution;
+ }
+
+
+ public void setMaxConcurrentStreamExecution(int maxConcurrentStreamExecution) {
+ this.maxConcurrentStreamExecution = maxConcurrentStreamExecution;
+ }
+
+
public int getInitialWindowSize() {
return initialWindowSize;
}
Modified: tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java?rev=1749898&r1=1749897&r2=1749898&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java Thu Jun 23 14:22:39 2016
@@ -139,6 +139,10 @@ public class Http2UpgradeHandler extends
private final Map<AbstractStream,int[]> backLogStreams = new ConcurrentHashMap<>();
private long backLogSize = 0;
+ // Stream concurrency control
+ private int maxConcurrentStreamExecution = Http2Protocol.DEFAULT_MAX_CONCURRENT_STREAM_EXECUTION;
+ private AtomicInteger streamConcurrency = null;
+ private Queue<StreamProcessor> queuedProcessors = null;
public Http2UpgradeHandler(Adapter adapter, Request coyoteRequest) {
super (STREAM_ID_ZERO);
@@ -174,6 +178,12 @@ public class Http2UpgradeHandler extends
return;
}
+ // Init concurrency control if needed
+ if (maxConcurrentStreamExecution < localSettings.getMaxConcurrentStreams()) {
+ streamConcurrency = new AtomicInteger(0);
+ queuedProcessors = new ConcurrentLinkedQueue<>();
+ }
+
parser = new Http2Parser(connectionId, this, this);
Stream stream = null;
@@ -242,7 +252,7 @@ public class Http2UpgradeHandler extends
if (webConnection != null) {
// Process the initial request on a container thread
- StreamProcessor streamProcessor = new StreamProcessor(stream, adapter, socketWrapper);
+ StreamProcessor streamProcessor = new StreamProcessor(this, stream, adapter, socketWrapper);
streamProcessor.setSslSupport(sslSupport);
socketWrapper.getEndpoint().getExecutor().execute(streamProcessor);
}
@@ -389,6 +399,33 @@ public class Http2UpgradeHandler extends
}
+ private int increaseStreamConcurrency() {
+ return streamConcurrency.incrementAndGet();
+ }
+
+ private int decreaseStreamConcurrency() {
+ return streamConcurrency.decrementAndGet();
+ }
+
+ private int getStreamConcurrency() {
+ return streamConcurrency.get();
+ }
+
+ void executeQueuedStream() {
+ if (streamConcurrency == null) {
+ return;
+ }
+ decreaseStreamConcurrency();
+ if (getStreamConcurrency() < maxConcurrentStreamExecution) {
+ StreamProcessor streamProcessor = queuedProcessors.poll();
+ if (streamProcessor != null) {
+ increaseStreamConcurrency();
+ socketWrapper.getEndpoint().getExecutor().execute(streamProcessor);
+ }
+ }
+ }
+
+
void sendStreamReset(StreamException se) throws IOException {
if (log.isDebugEnabled()) {
@@ -990,8 +1027,11 @@ public class Http2UpgradeHandler extends
pushStream.sentPushPromise();
// Process this stream on a container thread
- StreamProcessor streamProcessor = new StreamProcessor(pushStream, adapter, socketWrapper);
+ StreamProcessor streamProcessor = new StreamProcessor(this, pushStream, adapter, socketWrapper);
streamProcessor.setSslSupport(sslSupport);
+ if (streamConcurrency != null) {
+ increaseStreamConcurrency();
+ }
socketWrapper.getEndpoint().getExecutor().execute(streamProcessor);
}
@@ -1050,6 +1090,11 @@ public class Http2UpgradeHandler extends
}
+ public void setMaxConcurrentStreamExecution(int maxConcurrentStreamExecution) {
+ this.maxConcurrentStreamExecution = maxConcurrentStreamExecution;
+ }
+
+
public void setInitialWindowSize(int initialWindowSize) {
localSettings.set(Setting.INITIAL_WINDOW_SIZE, initialWindowSize);
}
@@ -1205,9 +1250,18 @@ public class Http2UpgradeHandler extends
Stream stream = getStream(streamId, connectionState.get().isNewStreamAllowed());
if (stream != null) {
// Process this stream on a container thread
- StreamProcessor streamProcessor = new StreamProcessor(stream, adapter, socketWrapper);
+ StreamProcessor streamProcessor = new StreamProcessor(this, stream, adapter, socketWrapper);
streamProcessor.setSslSupport(sslSupport);
- socketWrapper.getEndpoint().getExecutor().execute(streamProcessor);
+ if (streamConcurrency == null) {
+ socketWrapper.getEndpoint().getExecutor().execute(streamProcessor);
+ } else {
+ if (getStreamConcurrency() < maxConcurrentStreamExecution) {
+ increaseStreamConcurrency();
+ socketWrapper.getEndpoint().getExecutor().execute(streamProcessor);
+ } else {
+ queuedProcessors.offer(streamProcessor);
+ }
+ }
}
}
Modified: tomcat/trunk/java/org/apache/coyote/http2/StreamProcessor.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/StreamProcessor.java?rev=1749898&r1=1749897&r2=1749898&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/StreamProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http2/StreamProcessor.java Thu Jun 23 14:22:39 2016
@@ -43,13 +43,15 @@ public class StreamProcessor extends Abs
private static final Log log = LogFactory.getLog(StreamProcessor.class);
private static final StringManager sm = StringManager.getManager(StreamProcessor.class);
+ private final Http2UpgradeHandler handler;
private final Stream stream;
private volatile SSLSupport sslSupport;
- public StreamProcessor(Stream stream, Adapter adapter, SocketWrapperBase<?> socketWrapper) {
+ public StreamProcessor(Http2UpgradeHandler handler, Stream stream, Adapter adapter, SocketWrapperBase<?> socketWrapper) {
super(stream.getCoyoteRequest(), stream.getCoyoteResponse());
+ this.handler = handler;
this.stream = stream;
setAdapter(adapter);
setSocketWrapper(socketWrapper);
@@ -57,36 +59,43 @@ public class StreamProcessor extends Abs
@Override
- public synchronized void run() {
- // HTTP/2 equivalent of AbstractConnectionHandler#process() without the
- // socket <-> processor mapping
- ContainerThreadMarker.set();
- SocketState state = SocketState.CLOSED;
+ public void run() {
try {
- state = process(socketWrapper, SocketEvent.OPEN_READ);
-
- if (state == SocketState.CLOSED) {
- if (!getErrorState().isConnectionIoAllowed()) {
+ // FIXME: the regular processor syncs on socketWrapper, but here this deadlocks
+ synchronized (this) {
+ // HTTP/2 equivalent of AbstractConnectionHandler#process() without the
+ // socket <-> processor mapping
+ ContainerThreadMarker.set();
+ SocketState state = SocketState.CLOSED;
+ try {
+ state = process(socketWrapper, SocketEvent.OPEN_READ);
+
+ if (state == SocketState.CLOSED) {
+ if (!getErrorState().isConnectionIoAllowed()) {
+ ConnectionException ce = new ConnectionException(sm.getString(
+ "streamProcessor.error.connection", stream.getConnectionId(),
+ stream.getIdentifier()), Http2Error.INTERNAL_ERROR);
+ stream.close(ce);
+ } else if (!getErrorState().isIoAllowed()) {
+ StreamException se = new StreamException(sm.getString(
+ "streamProcessor.error.stream", stream.getConnectionId(),
+ stream.getIdentifier()), Http2Error.INTERNAL_ERROR,
+ stream.getIdentifier().intValue());
+ stream.close(se);
+ }
+ }
+ } catch (Exception e) {
ConnectionException ce = new ConnectionException(sm.getString(
"streamProcessor.error.connection", stream.getConnectionId(),
stream.getIdentifier()), Http2Error.INTERNAL_ERROR);
+ ce.initCause(e);
stream.close(ce);
- } else if (!getErrorState().isIoAllowed()) {
- StreamException se = new StreamException(sm.getString(
- "streamProcessor.error.stream", stream.getConnectionId(),
- stream.getIdentifier()), Http2Error.INTERNAL_ERROR,
- stream.getIdentifier().intValue());
- stream.close(se);
+ } finally {
+ ContainerThreadMarker.clear();
}
}
- } catch (Exception e) {
- ConnectionException ce = new ConnectionException(sm.getString(
- "streamProcessor.error.connection", stream.getConnectionId(),
- stream.getIdentifier()), Http2Error.INTERNAL_ERROR);
- ce.initCause(e);
- stream.close(ce);
} finally {
- ContainerThreadMarker.clear();
+ handler.executeQueuedStream();
}
}
Modified: tomcat/trunk/webapps/docs/changelog.xml
URL: http://svn.apache.org/viewvc/tomcat/trunk/webapps/docs/changelog.xml?rev=1749898&r1=1749897&r2=1749898&view=diff
==============================================================================
--- tomcat/trunk/webapps/docs/changelog.xml (original)
+++ tomcat/trunk/webapps/docs/changelog.xml Thu Jun 23 14:22:39 2016
@@ -95,6 +95,12 @@
Refactor the certifcate keystore and trust store generation to make it
easier for embedded users to inject their own key stores. (markt)
</scode>
+ <update>
+ Add a <code>maxConcurrentStreamExecution</code> on the HTTP/2
+ protocol handler to allow restricting the amount of concurrent stream
+ that are being executed in a single connection. The default is to
+ not limit it. (remm)
+ </update>
</changelog>
</subsection>
<subsection name="WebSocket">
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org