You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by re...@apache.org on 2016/06/23 14:22:39 UTC

svn commit: r1749898 - in /tomcat/trunk: java/org/apache/coyote/http2/Http2Protocol.java java/org/apache/coyote/http2/Http2UpgradeHandler.java java/org/apache/coyote/http2/StreamProcessor.java webapps/docs/changelog.xml

Author: remm
Date: Thu Jun 23 14:22:39 2016
New Revision: 1749898

URL: http://svn.apache.org/viewvc?rev=1749898&view=rev
Log:
59706: Add stream concurrency control (patch v3), after more testing, etc. It is not enabled by default.

Modified:
    tomcat/trunk/java/org/apache/coyote/http2/Http2Protocol.java
    tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java
    tomcat/trunk/java/org/apache/coyote/http2/StreamProcessor.java
    tomcat/trunk/webapps/docs/changelog.xml

Modified: tomcat/trunk/java/org/apache/coyote/http2/Http2Protocol.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Http2Protocol.java?rev=1749898&r1=1749897&r2=1749898&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/Http2Protocol.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http2/Http2Protocol.java Thu Jun 23 14:22:39 2016
@@ -35,6 +35,9 @@ public class Http2Protocol implements Up
     static final long DEFAULT_WRITE_TIMEOUT = 10000;
     // The HTTP/2 specification recommends a minimum default of 100
     static final long DEFAULT_MAX_CONCURRENT_STREAMS = 200;
+    // Maximum amount of streams which can be concurrently executed over
+    // a single connection
+    static final int DEFAULT_MAX_CONCURRENT_STREAM_EXECUTION = 200;
     // This default is defined by the HTTP/2 specification
     static final int DEFAULT_INITIAL_WINDOW_SIZE = (1 << 16) - 1;
 
@@ -47,6 +50,7 @@ public class Http2Protocol implements Up
     private long keepAliveTimeout = DEFAULT_KEEP_ALIVE_TIMEOUT;
     private long writeTimeout = DEFAULT_WRITE_TIMEOUT;
     private long maxConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS;
+    private int maxConcurrentStreamExecution = DEFAULT_MAX_CONCURRENT_STREAM_EXECUTION;
     // If a lower initial value is required, set it here but DO NOT change the
     // default defined above.
     private int initialWindowSize = DEFAULT_INITIAL_WINDOW_SIZE;
@@ -87,6 +91,7 @@ public class Http2Protocol implements Up
         result.setKeepAliveTimeout(getKeepAliveTimeout());
         result.setWriteTimeout(getWriteTimeout());
         result.setMaxConcurrentStreams(getMaxConcurrentStreams());
+        result.setMaxConcurrentStreamExecution(getMaxConcurrentStreamExecution());
         result.setInitialWindowSize(getInitialWindowSize());
 
         return result;
@@ -155,6 +160,16 @@ public class Http2Protocol implements Up
     }
 
 
+    public int getMaxConcurrentStreamExecution() {
+        return maxConcurrentStreamExecution;
+    }
+
+
+    public void setMaxConcurrentStreamExecution(int maxConcurrentStreamExecution) {
+        this.maxConcurrentStreamExecution = maxConcurrentStreamExecution;
+    }
+
+
     public int getInitialWindowSize() {
         return initialWindowSize;
     }

Modified: tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java?rev=1749898&r1=1749897&r2=1749898&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java Thu Jun 23 14:22:39 2016
@@ -139,6 +139,10 @@ public class Http2UpgradeHandler extends
     private final Map<AbstractStream,int[]> backLogStreams = new ConcurrentHashMap<>();
     private long backLogSize = 0;
 
+    // Stream concurrency control
+    private int maxConcurrentStreamExecution = Http2Protocol.DEFAULT_MAX_CONCURRENT_STREAM_EXECUTION;
+    private AtomicInteger streamConcurrency = null;
+    private Queue<StreamProcessor> queuedProcessors = null;
 
     public Http2UpgradeHandler(Adapter adapter, Request coyoteRequest) {
         super (STREAM_ID_ZERO);
@@ -174,6 +178,12 @@ public class Http2UpgradeHandler extends
             return;
         }
 
+        // Init concurrency control if needed
+        if (maxConcurrentStreamExecution < localSettings.getMaxConcurrentStreams()) {
+            streamConcurrency = new AtomicInteger(0);
+            queuedProcessors = new ConcurrentLinkedQueue<>();
+        }
+
         parser = new Http2Parser(connectionId, this, this);
 
         Stream stream = null;
@@ -242,7 +252,7 @@ public class Http2UpgradeHandler extends
 
         if (webConnection != null) {
             // Process the initial request on a container thread
-            StreamProcessor streamProcessor = new StreamProcessor(stream, adapter, socketWrapper);
+            StreamProcessor streamProcessor = new StreamProcessor(this, stream, adapter, socketWrapper);
             streamProcessor.setSslSupport(sslSupport);
             socketWrapper.getEndpoint().getExecutor().execute(streamProcessor);
         }
@@ -389,6 +399,33 @@ public class Http2UpgradeHandler extends
     }
 
 
+    private int increaseStreamConcurrency() {
+        return streamConcurrency.incrementAndGet();
+    }
+
+    private int decreaseStreamConcurrency() {
+        return streamConcurrency.decrementAndGet();
+    }
+
+    private int getStreamConcurrency() {
+        return streamConcurrency.get();
+    }
+
+    void executeQueuedStream() {
+        if (streamConcurrency == null) {
+            return;
+        }
+        decreaseStreamConcurrency();
+        if (getStreamConcurrency() < maxConcurrentStreamExecution) {
+            StreamProcessor streamProcessor = queuedProcessors.poll();
+            if (streamProcessor != null) {
+                increaseStreamConcurrency();
+                socketWrapper.getEndpoint().getExecutor().execute(streamProcessor);
+            }
+        }
+    }
+
+
     void sendStreamReset(StreamException se) throws IOException {
 
         if (log.isDebugEnabled()) {
@@ -990,8 +1027,11 @@ public class Http2UpgradeHandler extends
         pushStream.sentPushPromise();
 
         // Process this stream on a container thread
-        StreamProcessor streamProcessor = new StreamProcessor(pushStream, adapter, socketWrapper);
+        StreamProcessor streamProcessor = new StreamProcessor(this, pushStream, adapter, socketWrapper);
         streamProcessor.setSslSupport(sslSupport);
+        if (streamConcurrency != null) {
+            increaseStreamConcurrency();
+        }
         socketWrapper.getEndpoint().getExecutor().execute(streamProcessor);
     }
 
@@ -1050,6 +1090,11 @@ public class Http2UpgradeHandler extends
     }
 
 
+    public void setMaxConcurrentStreamExecution(int maxConcurrentStreamExecution) {
+        this.maxConcurrentStreamExecution = maxConcurrentStreamExecution;
+    }
+
+
     public void setInitialWindowSize(int initialWindowSize) {
         localSettings.set(Setting.INITIAL_WINDOW_SIZE, initialWindowSize);
     }
@@ -1205,9 +1250,18 @@ public class Http2UpgradeHandler extends
         Stream stream = getStream(streamId, connectionState.get().isNewStreamAllowed());
         if (stream != null) {
             // Process this stream on a container thread
-            StreamProcessor streamProcessor = new StreamProcessor(stream, adapter, socketWrapper);
+            StreamProcessor streamProcessor = new StreamProcessor(this, stream, adapter, socketWrapper);
             streamProcessor.setSslSupport(sslSupport);
-            socketWrapper.getEndpoint().getExecutor().execute(streamProcessor);
+            if (streamConcurrency == null) {
+                socketWrapper.getEndpoint().getExecutor().execute(streamProcessor);
+            } else {
+                if (getStreamConcurrency() < maxConcurrentStreamExecution) {
+                    increaseStreamConcurrency();
+                    socketWrapper.getEndpoint().getExecutor().execute(streamProcessor);
+                } else {
+                    queuedProcessors.offer(streamProcessor);
+                }
+            }
         }
     }
 

Modified: tomcat/trunk/java/org/apache/coyote/http2/StreamProcessor.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/StreamProcessor.java?rev=1749898&r1=1749897&r2=1749898&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/StreamProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http2/StreamProcessor.java Thu Jun 23 14:22:39 2016
@@ -43,13 +43,15 @@ public class StreamProcessor extends Abs
     private static final Log log = LogFactory.getLog(StreamProcessor.class);
     private static final StringManager sm = StringManager.getManager(StreamProcessor.class);
 
+    private final Http2UpgradeHandler handler;
     private final Stream stream;
 
     private volatile SSLSupport sslSupport;
 
 
-    public StreamProcessor(Stream stream, Adapter adapter, SocketWrapperBase<?> socketWrapper) {
+    public StreamProcessor(Http2UpgradeHandler handler, Stream stream, Adapter adapter, SocketWrapperBase<?> socketWrapper) {
         super(stream.getCoyoteRequest(), stream.getCoyoteResponse());
+        this.handler = handler;
         this.stream = stream;
         setAdapter(adapter);
         setSocketWrapper(socketWrapper);
@@ -57,36 +59,43 @@ public class StreamProcessor extends Abs
 
 
     @Override
-    public synchronized void run() {
-        // HTTP/2 equivalent of AbstractConnectionHandler#process() without the
-        // socket <-> processor mapping
-        ContainerThreadMarker.set();
-        SocketState state = SocketState.CLOSED;
+    public void run() {
         try {
-            state = process(socketWrapper, SocketEvent.OPEN_READ);
-
-            if (state == SocketState.CLOSED) {
-                if (!getErrorState().isConnectionIoAllowed()) {
+            // FIXME: the regular processor syncs on socketWrapper, but here this deadlocks
+            synchronized (this) {
+                // HTTP/2 equivalent of AbstractConnectionHandler#process() without the
+                // socket <-> processor mapping
+                ContainerThreadMarker.set();
+                SocketState state = SocketState.CLOSED;
+                try {
+                    state = process(socketWrapper, SocketEvent.OPEN_READ);
+
+                    if (state == SocketState.CLOSED) {
+                        if (!getErrorState().isConnectionIoAllowed()) {
+                            ConnectionException ce = new ConnectionException(sm.getString(
+                                    "streamProcessor.error.connection", stream.getConnectionId(),
+                                    stream.getIdentifier()), Http2Error.INTERNAL_ERROR);
+                            stream.close(ce);
+                        } else if (!getErrorState().isIoAllowed()) {
+                            StreamException se = new StreamException(sm.getString(
+                                    "streamProcessor.error.stream", stream.getConnectionId(),
+                                    stream.getIdentifier()), Http2Error.INTERNAL_ERROR,
+                                    stream.getIdentifier().intValue());
+                            stream.close(se);
+                        }
+                    }
+                } catch (Exception e) {
                     ConnectionException ce = new ConnectionException(sm.getString(
                             "streamProcessor.error.connection", stream.getConnectionId(),
                             stream.getIdentifier()), Http2Error.INTERNAL_ERROR);
+                    ce.initCause(e);
                     stream.close(ce);
-                } else if (!getErrorState().isIoAllowed()) {
-                    StreamException se = new StreamException(sm.getString(
-                            "streamProcessor.error.stream", stream.getConnectionId(),
-                            stream.getIdentifier()), Http2Error.INTERNAL_ERROR,
-                            stream.getIdentifier().intValue());
-                    stream.close(se);
+                } finally {
+                    ContainerThreadMarker.clear();
                 }
             }
-        } catch (Exception e) {
-            ConnectionException ce = new ConnectionException(sm.getString(
-                    "streamProcessor.error.connection", stream.getConnectionId(),
-                    stream.getIdentifier()), Http2Error.INTERNAL_ERROR);
-            ce.initCause(e);
-            stream.close(ce);
         } finally {
-            ContainerThreadMarker.clear();
+            handler.executeQueuedStream();
         }
     }
 

Modified: tomcat/trunk/webapps/docs/changelog.xml
URL: http://svn.apache.org/viewvc/tomcat/trunk/webapps/docs/changelog.xml?rev=1749898&r1=1749897&r2=1749898&view=diff
==============================================================================
--- tomcat/trunk/webapps/docs/changelog.xml (original)
+++ tomcat/trunk/webapps/docs/changelog.xml Thu Jun 23 14:22:39 2016
@@ -95,6 +95,12 @@
         Refactor the certifcate keystore and trust store generation to make it
         easier for embedded users to inject their own key stores. (markt)
       </scode>
+      <update>
+        Add a <code>maxConcurrentStreamExecution</code> on the HTTP/2
+        protocol handler to allow restricting the amount of concurrent stream
+        that are being executed in a single connection. The default is to
+        not limit it. (remm)
+      </update>
     </changelog>
   </subsection>
   <subsection name="WebSocket">



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org