You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2021/09/15 13:28:35 UTC

[tomcat] 05/05: Refactor allocations for the connection flow control window

This is an automated email from the ASF dual-hosted git repository.

markt pushed a commit to branch 10.0.x
in repository https://gitbox.apache.org/repos/asf/tomcat.git

commit 23be85654d4121718610aea7a586af5748a310c9
Author: Mark Thomas <ma...@apache.org>
AuthorDate: Wed Sep 15 14:12:26 2021 +0100

    Refactor allocations for the connection flow control window
    
    There are multiple related changes in this commit
    - The BackLog tracker object is removed and replaced with fields on
      AbstractStream.
    - If an incomplete allocation is made for a stream as a result of a
      WINDOW_UPDATE frame the remainder of the request is now discarded
    - The connection flow control window is, effectively, reduced as soon as
      an allocation is made rather than waiting until after the Stream is
      released. This is an improvement over the previous approach where over
      allocation was possible.
    - The loop in reserveWindowSize is removed
---
 java/org/apache/coyote/http2/AbstractStream.java   |  27 ++
 .../apache/coyote/http2/Http2UpgradeHandler.java   | 316 ++++++++-------------
 .../apache/coyote/http2/LocalStrings.properties    |   4 +-
 .../coyote/http2/WindowAllocationManager.java      |   3 +-
 webapps/docs/changelog.xml                         |   6 +
 5 files changed, 154 insertions(+), 202 deletions(-)

diff --git a/java/org/apache/coyote/http2/AbstractStream.java b/java/org/apache/coyote/http2/AbstractStream.java
index c7374b6..6a825c2 100644
--- a/java/org/apache/coyote/http2/AbstractStream.java
+++ b/java/org/apache/coyote/http2/AbstractStream.java
@@ -40,6 +40,9 @@ abstract class AbstractStream {
     private final Set<AbstractNonZeroStream> childStreams = Collections.newSetFromMap(new ConcurrentHashMap<>());
     private long windowSize = ConnectionSettingsBase.DEFAULT_INITIAL_WINDOW_SIZE;
 
+    private volatile int connectionAllocationRequested = 0;
+    private volatile int connectionAllocationMade = 0;
+
 
     AbstractStream(Integer identifier) {
         this.identifier = identifier;
@@ -154,6 +157,30 @@ abstract class AbstractStream {
     }
 
 
+    final int getConnectionAllocationRequested() {
+        return connectionAllocationRequested;
+    }
+
+
+    final void setConnectionAllocationRequested(int connectionAllocationRequested) {
+        log.debug(sm.getString("abstractStream.setConnectionAllocationRequested", getConnectionId(), getIdAsString(),
+                Integer.toString(this.connectionAllocationRequested), Integer.toString(connectionAllocationRequested)));
+        this.connectionAllocationRequested = connectionAllocationRequested;
+    }
+
+
+    final int getConnectionAllocationMade() {
+        return connectionAllocationMade;
+    }
+
+
+    final void setConnectionAllocationMade(int connectionAllocationMade) {
+        log.debug(sm.getString("abstractStream.setConnectionAllocationMade", getConnectionId(), getIdAsString(),
+                Integer.toString(this.connectionAllocationMade), Integer.toString(connectionAllocationMade)));
+        this.connectionAllocationMade = connectionAllocationMade;
+    }
+
+
     abstract String getConnectionId();
 
     abstract int getWeight();
diff --git a/java/org/apache/coyote/http2/Http2UpgradeHandler.java b/java/org/apache/coyote/http2/Http2UpgradeHandler.java
index 045337a..529b4f7 100644
--- a/java/org/apache/coyote/http2/Http2UpgradeHandler.java
+++ b/java/org/apache/coyote/http2/Http2UpgradeHandler.java
@@ -20,10 +20,9 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Queue;
 import java.util.Set;
 import java.util.TreeSet;
@@ -132,7 +131,7 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH
     private final AtomicInteger nextLocalStreamId = new AtomicInteger(2);
     private final PingManager pingManager = getPingManager();
     private volatile int newStreamsSinceLastPrune = 0;
-    private final Map<AbstractStream, BacklogTracker> backLogStreams = new ConcurrentHashMap<>();
+    private final Set<AbstractStream> backLogStreams = Collections.newSetFromMap(new ConcurrentHashMap<>());
     private long backLogSize = 0;
     // The time at which the connection will timeout unless data arrives before
     // then. -1 means no timeout.
@@ -873,101 +872,81 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH
         // this thread until after this thread enters wait()
         int allocation = 0;
         synchronized (stream) {
-            do {
-                synchronized (this) {
-                    if (!stream.canWrite()) {
-                        stream.doStreamCancel(sm.getString("upgradeHandler.stream.notWritable",
-                                stream.getConnectionId(), stream.getIdAsString()), Http2Error.STREAM_CLOSED);
-                    }
-                    long windowSize = getWindowSize();
-                    if (windowSize < 1 || backLogSize > 0) {
-                        // Has this stream been granted an allocation
-                        BacklogTracker tracker = backLogStreams.get(stream);
-                        if (tracker == null) {
-                            tracker = new BacklogTracker(reservation);
-                            backLogStreams.put(stream, tracker);
-                            backLogSize += reservation;
-                            // Add the parents as well
-                            AbstractStream parent = stream.getParentStream();
-                            while (parent != null && backLogStreams.putIfAbsent(parent, new BacklogTracker()) == null) {
-                                parent = parent.getParentStream();
-                            }
-                        } else {
-                            if (tracker.getUnusedAllocation() > 0) {
-                                allocation = tracker.getUnusedAllocation();
-                                decrementWindowSize(allocation);
-                                if (tracker.getRemainingReservation() == 0) {
-                                    // The reservation has been fully allocated
-                                    // so this stream can be removed from the
-                                    // backlog.
-                                    backLogStreams.remove(stream);
-                                } else {
-                                    // This allocation has been used. Leave the
-                                    // stream on the backlog as it still has
-                                    // more bytes to write.
-                                    tracker.useAllocation();
-                                }
-                            }
+            synchronized (this) {
+                if (!stream.canWrite()) {
+                    stream.doStreamCancel(sm.getString("upgradeHandler.stream.notWritable",
+                            stream.getConnectionId(), stream.getIdAsString()), Http2Error.STREAM_CLOSED);
+                }
+                long windowSize = getWindowSize();
+                if (stream.getConnectionAllocationMade() > 0) {
+                    allocation = stream.getConnectionAllocationMade();
+                    stream.setConnectionAllocationMade(0);
+                } else  if (windowSize < 1) {
+                    // Has this stream been granted an allocation
+                    if (stream.getConnectionAllocationMade() == 0) {
+                        stream.setConnectionAllocationRequested(reservation);
+                        backLogSize += reservation;
+                        backLogStreams.add(stream);
+                        // Add the parents as well
+                        AbstractStream parent = stream.getParentStream();
+                        while (parent != null && backLogStreams.add(parent)) {
+                            parent = parent.getParentStream();
                         }
-                    } else if (windowSize < reservation) {
-                        allocation = (int) windowSize;
-                        decrementWindowSize(allocation);
-                    } else {
-                        allocation = reservation;
-                        decrementWindowSize(allocation);
                     }
+                } else if (windowSize < reservation) {
+                    allocation = (int) windowSize;
+                    decrementWindowSize(allocation);
+                } else {
+                    allocation = reservation;
+                    decrementWindowSize(allocation);
                 }
-                if (allocation == 0) {
-                    if (block) {
-                        try {
-                            // Connection level window is empty. Although this
-                            // request is for a stream, use the connection
-                            // timeout
-                            long writeTimeout = protocol.getWriteTimeout();
-                            stream.waitForConnectionAllocation(writeTimeout);
-                            // Has this stream been granted an allocation
-                            // Note: If the stream in not in this Map then the
-                            //       requested write has been fully allocated
-                            BacklogTracker tracker;
-                            // Ensure allocations made in other threads are visible
-                            synchronized (this) {
-                                tracker = backLogStreams.get(stream);
-                            }
-                            if (tracker != null && tracker.getUnusedAllocation() == 0) {
-                                String msg;
-                                Http2Error error;
-                                if (stream.isActive()) {
-                                    if (log.isDebugEnabled()) {
-                                        log.debug(sm.getString("upgradeHandler.noAllocation",
-                                                connectionId, stream.getIdAsString()));
-                                    }
-                                    // No allocation
-                                    // Close the connection. Do this first since
-                                    // closing the stream will raise an exception.
-                                    close();
-                                    msg = sm.getString("stream.writeTimeout");
-                                    error = Http2Error.ENHANCE_YOUR_CALM;
-                                } else {
-                                    msg = sm.getString("stream.clientCancel");
-                                    error = Http2Error.STREAM_CLOSED;
+            }
+            if (allocation == 0) {
+                if (block) {
+                    try {
+                        // Connection level window is empty. Although this
+                        // request is for a stream, use the connection
+                        // timeout
+                        long writeTimeout = protocol.getWriteTimeout();
+                        stream.waitForConnectionAllocation(writeTimeout);
+                        // Has this stream been granted an allocation
+                        if (stream.getConnectionAllocationMade() == 0) {
+                            String msg;
+                            Http2Error error;
+                            if (stream.isActive()) {
+                                if (log.isDebugEnabled()) {
+                                    log.debug(sm.getString("upgradeHandler.noAllocation",
+                                            connectionId, stream.getIdAsString()));
                                 }
-                                // Close the stream
-                                // This thread is in application code so need
-                                // to signal to the application that the
-                                // stream is closing
-                                stream.doStreamCancel(msg, error);
+                                // No allocation
+                                // Close the connection. Do this first since
+                                // closing the stream will raise an exception.
+                                close();
+                                msg = sm.getString("stream.writeTimeout");
+                                error = Http2Error.ENHANCE_YOUR_CALM;
+                            } else {
+                                msg = sm.getString("stream.clientCancel");
+                                error = Http2Error.STREAM_CLOSED;
                             }
-                        } catch (InterruptedException e) {
-                            throw new IOException(sm.getString(
-                                    "upgradeHandler.windowSizeReservationInterrupted", connectionId,
-                                    stream.getIdAsString(), Integer.toString(reservation)), e);
+                            // Close the stream
+                            // This thread is in application code so need
+                            // to signal to the application that the
+                            // stream is closing
+                            stream.doStreamCancel(msg, error);
+                        } else {
+                            allocation = stream.getConnectionAllocationMade();
+                            stream.setConnectionAllocationMade(0);
                         }
-                    } else {
-                        stream.waitForConnectionAllocationNonBlocking();
-                        return 0;
+                    } catch (InterruptedException e) {
+                        throw new IOException(sm.getString(
+                                "upgradeHandler.windowSizeReservationInterrupted", connectionId,
+                                stream.getIdAsString(), Integer.toString(reservation)), e);
                     }
+                } else {
+                    stream.waitForConnectionAllocationNonBlocking();
+                    return 0;
                 }
-            } while (allocation == 0);
+            }
         }
         return allocation;
     }
@@ -983,19 +962,12 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH
         synchronized (this) {
             long windowSize = getWindowSize();
             if (windowSize < 1 && windowSize + increment > 0) {
-                //  Connection window is completed exhausted. Assume there will
-                // be streams to notify. The overhead is minimal if there are
-                // none.
+                // Connection window is exhausted. Assume there will be streams
+                // to notify. The overhead is minimal if there are none.
                 streamsToNotify = releaseBackLog((int) (windowSize +increment));
-            } else if (backLogSize > 0) {
-                // While windowSize is greater than zero, all of it has already
-                // been allocated to streams in the backlog (or just about to
-                // exit the backlog). If any of windowSize was unallocated or
-                // 'spare', backLogSize would be zero. Therefore, apply this
-                // addition allocation to the backlog.
-                streamsToNotify = releaseBackLog(increment);
+            } else {
+                super.incrementWindowSize(increment);
             }
-            super.incrementWindowSize(increment);
         }
 
         if (streamsToNotify != null) {
@@ -1029,26 +1001,34 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH
     }
 
 
-    private synchronized Set<AbstractStream> releaseBackLog(int increment) {
+    private synchronized Set<AbstractStream> releaseBackLog(int increment) throws Http2Exception {
         Set<AbstractStream> result = new HashSet<>();
-        if (backLogSize < increment) {
+        int remaining = increment;
+        if (backLogSize < remaining) {
             // Can clear the whole backlog
-            result.addAll(backLogStreams.keySet());
-            backLogStreams.clear();
+            for (AbstractStream stream : backLogStreams) {
+                if (stream.getConnectionAllocationRequested() > 0) {
+                    stream.setConnectionAllocationMade(stream.getConnectionAllocationRequested());
+                    stream.setConnectionAllocationRequested(0);
+                }
+            }
+            remaining -= backLogSize;
             backLogSize = 0;
+            super.incrementWindowSize(remaining);
+
+            result.addAll(backLogStreams);
+            backLogStreams.clear();
         } else {
-            int leftToAllocate = increment;
-            while (leftToAllocate > 0) {
-                leftToAllocate = allocate(this, leftToAllocate);
-            }
-            for (Entry<AbstractStream,BacklogTracker> entry : backLogStreams.entrySet()) {
-                int allocation = entry.getValue().getUnusedAllocation();
-                if (allocation > 0) {
-                    backLogSize -= allocation;
-                    if (!entry.getValue().isNotifyInProgress()) {
-                        result.add(entry.getKey());
-                        entry.getValue().startNotify();
-                    }
+            allocate(this, remaining);
+            Iterator<AbstractStream> streamIter = backLogStreams.iterator();
+            while (streamIter.hasNext()) {
+                AbstractStream stream = streamIter.next();
+                if (stream.getConnectionAllocationMade() > 0) {
+                    backLogSize -= stream.getConnectionAllocationMade();
+                    backLogSize -= stream.getConnectionAllocationRequested();
+                    stream.setConnectionAllocationRequested(0);
+                    result.add(stream);
+                    streamIter.remove();
                 }
             }
         }
@@ -1061,10 +1041,20 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH
             log.debug(sm.getString("upgradeHandler.allocate.debug", getConnectionId(),
                     stream.getIdAsString(), Integer.toString(allocation)));
         }
-        // Allocate to the specified stream
-        BacklogTracker tracker = backLogStreams.get(stream);
 
-        int leftToAllocate = tracker.allocate(allocation);
+        int leftToAllocate = allocation;
+
+        if (stream.getConnectionAllocationRequested() > 0) {
+            int allocatedThisTime;
+            if (allocation >= stream.getConnectionAllocationRequested()) {
+                allocatedThisTime = stream.getConnectionAllocationRequested();
+            } else {
+                allocatedThisTime = allocation;
+            }
+            stream.setConnectionAllocationRequested(stream.getConnectionAllocationRequested() - allocatedThisTime);
+            stream.setConnectionAllocationMade(stream.getConnectionAllocationMade() + allocatedThisTime);
+            leftToAllocate = leftToAllocate - allocatedThisTime;
+        }
 
         if (leftToAllocate == 0) {
             return 0;
@@ -1078,14 +1068,17 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH
         // Recipients are children of the current stream that are in the
         // backlog.
         Set<AbstractStream> recipients = new HashSet<>(stream.getChildStreams());
-        recipients.retainAll(backLogStreams.keySet());
+        recipients.retainAll(backLogStreams);
 
         // Loop until we run out of allocation or recipients
         while (leftToAllocate > 0) {
             if (recipients.size() == 0) {
-                if (tracker.getUnusedAllocation() == 0) {
+                if (stream.getConnectionAllocationMade() == 0) {
                     backLogStreams.remove(stream);
                 }
+                if (stream.getIdAsInt() == 0) {
+                    throw new IllegalStateException();
+                }
                 return leftToAllocate;
             }
 
@@ -1832,8 +1825,7 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH
             if (average < overheadThreshold) {
                 // For Streams, client might only release the minimum so check
                 // against current demand
-                BacklogTracker tracker = backLogStreams.get(stream);
-                if (tracker == null || increment < tracker.getRemainingReservation()) {
+                if (increment < stream.getConnectionAllocationRequested()) {
                     // The smaller the increment, the larger the overhead
                     increaseOverheadCount(FrameType.WINDOW_UPDATE, overheadThreshold / average);
                 }
@@ -2044,80 +2036,4 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH
             payload = ByteBuffer.allocate(payload.capacity() * 2);
         }
     }
-
-
-    private static class BacklogTracker {
-
-        private int remainingReservation;
-        private int unusedAllocation;
-        private boolean notifyInProgress;
-
-        public BacklogTracker() {
-        }
-
-        public BacklogTracker(int reservation) {
-            remainingReservation = reservation;
-        }
-
-        /**
-         * @return The number of bytes requiring an allocation from the
-         *         Connection flow control window
-         */
-        public int getRemainingReservation() {
-            return remainingReservation;
-        }
-
-        /**
-         *
-         * @return The number of bytes allocated from the Connection flow
-         *         control window but not yet written
-         */
-        public int getUnusedAllocation() {
-            return unusedAllocation;
-        }
-
-        /**
-         * The purpose of this is to avoid the incorrect triggering of a timeout
-         * for the following sequence of events:
-         * <ol>
-         * <li>window update 1</li>
-         * <li>allocation 1</li>
-         * <li>notify 1</li>
-         * <li>window update 2</li>
-         * <li>allocation 2</li>
-         * <li>act on notify 1 (using allocation 1 and 2)</li>
-         * <li>notify 2</li>
-         * <li>act on notify 2 (timeout due to no allocation)</li>
-         * </ol>
-         *
-         * @return {@code true} if a notify has been issued but the associated
-         *         allocation has not been used, otherwise {@code false}
-         */
-        public boolean isNotifyInProgress() {
-            return notifyInProgress;
-        }
-
-        public void useAllocation() {
-            unusedAllocation = 0;
-            notifyInProgress = false;
-        }
-
-        public void startNotify() {
-            notifyInProgress = true;
-        }
-
-        private int allocate(int allocation) {
-            if (remainingReservation >= allocation) {
-                remainingReservation -= allocation;
-                unusedAllocation += allocation;
-                return 0;
-            }
-
-            int left = allocation - remainingReservation;
-            unusedAllocation += remainingReservation;
-            remainingReservation = 0;
-
-            return left;
-        }
-    }
 }
diff --git a/java/org/apache/coyote/http2/LocalStrings.properties b/java/org/apache/coyote/http2/LocalStrings.properties
index cf36ded..02f7abe 100644
--- a/java/org/apache/coyote/http2/LocalStrings.properties
+++ b/java/org/apache/coyote/http2/LocalStrings.properties
@@ -13,6 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+abstractStream.setConnectionAllocationRequested=Connection [{0}], Stream [{1}], connection allocation requested changed from [{2}] to [{3}]
+abstractStream.setConnectionAllocationMade=Connection [{0}], Stream [{1}], connection allocation made changed from [{2}] to [{3}]
 abstractStream.windowSizeDec=Connection [{0}], Stream [{1}], reduce flow control window by [{2}] to [{3}]
 abstractStream.windowSizeInc=Connection [{0}], Stream [{1}], increase flow control window by [{2}] to [{3}]
 abstractStream.windowSizeTooBig=Connection [{0}], Stream [{1}], increased window size by [{2}] to [{3}] which exceeded permitted maximum
@@ -169,7 +171,7 @@ upgradeHandler.writePushHeaders=Connection [{0}], Stream [{1}], Pushed stream [{
 windowAllocationManager.dispatched=Connection [{0}], Stream [{1}], Dispatched
 windowAllocationManager.notified=Connection [{0}], Stream [{1}], Notified
 windowAllocationManager.notify=Connection [{0}], Stream [{1}], Waiting type [{2}], Notify type [{3}]
-windowAllocationManager.waitFor.connection=Connection [{0}], Stream [{1}], Waiting for Connection flow control window (blocking) with timeout [{2}]
+windowAllocationManager.waitFor.connection=Connection [{0}], Stream [{1}], Waiting for [{2}] bytes from connection flow control window (blocking) with timeout [{3}]
 windowAllocationManager.waitFor.ise=Connection [{0}], Stream [{1}], Already waiting
 windowAllocationManager.waitFor.stream=Connection [{0}], Stream [{1}], Waiting for Stream flow control window (blocking) with timeout [{2}]
 windowAllocationManager.waitForNonBlocking.connection=Connection [{0}], Stream [{1}], Waiting for Connection flow control window (non-blocking)
diff --git a/java/org/apache/coyote/http2/WindowAllocationManager.java b/java/org/apache/coyote/http2/WindowAllocationManager.java
index 6c824a5..45ac2fd 100644
--- a/java/org/apache/coyote/http2/WindowAllocationManager.java
+++ b/java/org/apache/coyote/http2/WindowAllocationManager.java
@@ -78,7 +78,8 @@ class WindowAllocationManager {
     void waitForConnection(long timeout) throws InterruptedException {
         if (log.isDebugEnabled()) {
             log.debug(sm.getString("windowAllocationManager.waitFor.connection",
-                    stream.getConnectionId(), stream.getIdAsString(), Long.toString(timeout)));
+                    stream.getConnectionId(), stream.getIdAsString(),
+                    Integer.toString(stream.getConnectionAllocationRequested()), Long.toString(timeout)));
         }
 
         waitFor(CONNECTION, timeout);
diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml
index a13f801..cb1841a 100644
--- a/webapps/docs/changelog.xml
+++ b/webapps/docs/changelog.xml
@@ -125,6 +125,12 @@
         Correct a potential <code>StackOverflowException</code> with HTTP/2 and
         sendfile. (markt)
       </fix>
+      <fix>
+        Further improvements in the management of the connection flow control
+        window. This addresses various bugs that what cause streams to
+        incorrectly report that they had timed out waiting for an allocation
+        from the connection flow control window. (markt)
+      </fix>
     </changelog>
   </subsection>
 </section>

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org