You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2021/09/13 16:29:24 UTC

[tomcat] 02/02: Revert "Step 1 - merge BackLogTracker into AbstractStream"

This is an automated email from the ASF dual-hosted git repository.

markt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tomcat.git

commit aca188a85b607f5fad3457619ed453ba0b7180a7
Author: Mark Thomas <ma...@apache.org>
AuthorDate: Mon Sep 13 17:28:16 2021 +0100

    Revert "Step 1 - merge BackLogTracker into AbstractStream"
    
    This reverts commit 74681f56475541ff6069aadb1a88bd4d687e7446.
---
 java/org/apache/coyote/http2/AbstractStream.java   |  65 -----------
 .../apache/coyote/http2/Http2UpgradeHandler.java   | 128 +++++++++++++++++----
 2 files changed, 107 insertions(+), 86 deletions(-)

diff --git a/java/org/apache/coyote/http2/AbstractStream.java b/java/org/apache/coyote/http2/AbstractStream.java
index d3195eb..c7374b6 100644
--- a/java/org/apache/coyote/http2/AbstractStream.java
+++ b/java/org/apache/coyote/http2/AbstractStream.java
@@ -40,9 +40,6 @@ abstract class AbstractStream {
     private final Set<AbstractNonZeroStream> childStreams = Collections.newSetFromMap(new ConcurrentHashMap<>());
     private long windowSize = ConnectionSettingsBase.DEFAULT_INITIAL_WINDOW_SIZE;
 
-    volatile int remainingReservation;
-    volatile int unusedAllocation;
-    volatile boolean notifyInProgress;
 
     AbstractStream(Integer identifier) {
         this.identifier = identifier;
@@ -160,66 +157,4 @@ abstract class AbstractStream {
     abstract String getConnectionId();
 
     abstract int getWeight();
-
-
-    /**
-     * @return The number of bytes requiring an allocation from the
-     *         Connection flow control window
-     */
-    public int getRemainingReservation() {
-        return remainingReservation;
-    }
-
-    /**
-     *
-     * @return The number of bytes allocated from the Connection flow
-     *         control window but not yet written
-     */
-    public int getUnusedAllocation() {
-        return unusedAllocation;
-    }
-
-    /**
-     * The purpose of this is to avoid the incorrect triggering of a timeout
-     * for the following sequence of events:
-     * <ol>
-     * <li>window update 1</li>
-     * <li>allocation 1</li>
-     * <li>notify 1</li>
-     * <li>window update 2</li>
-     * <li>allocation 2</li>
-     * <li>act on notify 1 (using allocation 1 and 2)</li>
-     * <li>notify 2</li>
-     * <li>act on notify 2 (timeout due to no allocation)</li>
-     * </ol>
-     *
-     * @return {@code true} if a notify has been issued but the associated
-     *         allocation has not been used, otherwise {@code false}
-     */
-    public boolean isNotifyInProgress() {
-        return notifyInProgress;
-    }
-
-    public void useAllocation() {
-        unusedAllocation = 0;
-        notifyInProgress = false;
-    }
-
-    public void startNotify() {
-        notifyInProgress = true;
-    }
-
-    protected int allocate(int allocation) {
-        if (remainingReservation >= allocation) {
-            remainingReservation -= allocation;
-            unusedAllocation += allocation;
-            return 0;
-        }
-
-        int left = allocation - remainingReservation;
-        unusedAllocation += remainingReservation;
-        remainingReservation = 0;
-
-        return left;
-    }
 }
diff --git a/java/org/apache/coyote/http2/Http2UpgradeHandler.java b/java/org/apache/coyote/http2/Http2UpgradeHandler.java
index b45dad0..f28ae6d 100644
--- a/java/org/apache/coyote/http2/Http2UpgradeHandler.java
+++ b/java/org/apache/coyote/http2/Http2UpgradeHandler.java
@@ -20,9 +20,10 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Queue;
 import java.util.Set;
 import java.util.TreeSet;
@@ -131,7 +132,7 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH
     private final AtomicInteger nextLocalStreamId = new AtomicInteger(2);
     private final PingManager pingManager = getPingManager();
     private volatile int newStreamsSinceLastPrune = 0;
-    private final Set<AbstractStream> backLogStreams = Collections.newSetFromMap(new ConcurrentHashMap<>());
+    private final Map<AbstractStream, BacklogTracker> backLogStreams = new ConcurrentHashMap<>();
     private long backLogSize = 0;
     // The time at which the connection will timeout unless data arrives before
     // then. -1 means no timeout.
@@ -881,20 +882,21 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH
                     long windowSize = getWindowSize();
                     if (windowSize < 1 || backLogSize > 0) {
                         // Has this stream been granted an allocation
-                        if (stream.remainingReservation == 0) {
-                            backLogStreams.add(stream);
-                            stream.remainingReservation = reservation;
+                        BacklogTracker tracker = backLogStreams.get(stream);
+                        if (tracker == null) {
+                            tracker = new BacklogTracker(reservation);
+                            backLogStreams.put(stream, tracker);
                             backLogSize += reservation;
                             // Add the parents as well
                             AbstractStream parent = stream.getParentStream();
-                            while (parent != null && backLogStreams.add(parent)) {
+                            while (parent != null && backLogStreams.putIfAbsent(parent, new BacklogTracker()) == null) {
                                 parent = parent.getParentStream();
                             }
                         } else {
-                            if (stream.getUnusedAllocation() > 0) {
-                                allocation = stream.getUnusedAllocation();
+                            if (tracker.getUnusedAllocation() > 0) {
+                                allocation = tracker.getUnusedAllocation();
                                 decrementWindowSize(allocation);
-                                if (stream.getRemainingReservation() == 0) {
+                                if (tracker.getRemainingReservation() == 0) {
                                     // The reservation has been fully allocated
                                     // so this stream can be removed from the
                                     // backlog.
@@ -903,7 +905,7 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH
                                     // This allocation has been used. Leave the
                                     // stream on the backlog as it still has
                                     // more bytes to write.
-                                    stream.useAllocation();
+                                    tracker.useAllocation();
                                 }
                             }
                         }
@@ -926,7 +928,12 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH
                             // Has this stream been granted an allocation
                             // Note: If the stream in not in this Map then the
                             //       requested write has been fully allocated
-                            if (stream.getUnusedAllocation() == 0) {
+                            BacklogTracker tracker;
+                            // Ensure allocations made in other threads are visible
+                            synchronized (this) {
+                                tracker = backLogStreams.get(stream);
+                            }
+                            if (tracker != null && tracker.getUnusedAllocation() == 0) {
                                 String msg;
                                 Http2Error error;
                                 if (stream.isActive()) {
@@ -1026,7 +1033,7 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH
         Set<AbstractStream> result = new HashSet<>();
         if (backLogSize < increment) {
             // Can clear the whole backlog
-            result.addAll(backLogStreams);
+            result.addAll(backLogStreams.keySet());
             backLogStreams.clear();
             backLogSize = 0;
         } else {
@@ -1034,13 +1041,13 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH
             while (leftToAllocate > 0) {
                 leftToAllocate = allocate(this, leftToAllocate);
             }
-            for (AbstractStream stream : backLogStreams) {
-                int allocation = stream.getUnusedAllocation();
+            for (Entry<AbstractStream,BacklogTracker> entry : backLogStreams.entrySet()) {
+                int allocation = entry.getValue().getUnusedAllocation();
                 if (allocation > 0) {
                     backLogSize -= allocation;
-                    if (!stream.isNotifyInProgress()) {
-                        result.add(stream);
-                        stream.startNotify();
+                    if (!entry.getValue().isNotifyInProgress()) {
+                        result.add(entry.getKey());
+                        entry.getValue().startNotify();
                     }
                 }
             }
@@ -1054,8 +1061,10 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH
             log.debug(sm.getString("upgradeHandler.allocate.debug", getConnectionId(),
                     stream.getIdAsString(), Integer.toString(allocation)));
         }
+        // Allocate to the specified stream
+        BacklogTracker tracker = backLogStreams.get(stream);
 
-        int leftToAllocate = stream.allocate(allocation);
+        int leftToAllocate = tracker.allocate(allocation);
 
         if (leftToAllocate == 0) {
             return 0;
@@ -1069,12 +1078,12 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH
         // Recipients are children of the current stream that are in the
         // backlog.
         Set<AbstractStream> recipients = new HashSet<>(stream.getChildStreams());
-        recipients.retainAll(backLogStreams);
+        recipients.retainAll(backLogStreams.keySet());
 
         // Loop until we run out of allocation or recipients
         while (leftToAllocate > 0) {
             if (recipients.size() == 0) {
-                if (stream.getUnusedAllocation() == 0) {
+                if (tracker.getUnusedAllocation() == 0) {
                     backLogStreams.remove(stream);
                 }
                 return leftToAllocate;
@@ -1823,7 +1832,8 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH
             if (average < overheadThreshold) {
                 // For Streams, client might only release the minimum so check
                 // against current demand
-                if (increment < stream.getRemainingReservation()) {
+                BacklogTracker tracker = backLogStreams.get(stream);
+                if (tracker == null || increment < tracker.getRemainingReservation()) {
                     // The smaller the increment, the larger the overhead
                     increaseOverheadCount(FrameType.WINDOW_UPDATE, overheadThreshold / average);
                 }
@@ -2034,4 +2044,80 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH
             payload = ByteBuffer.allocate(payload.capacity() * 2);
         }
     }
+
+
+    private static class BacklogTracker {
+
+        private int remainingReservation;
+        private int unusedAllocation;
+        private boolean notifyInProgress;
+
+        public BacklogTracker() {
+        }
+
+        public BacklogTracker(int reservation) {
+            remainingReservation = reservation;
+        }
+
+        /**
+         * @return The number of bytes requiring an allocation from the
+         *         Connection flow control window
+         */
+        public int getRemainingReservation() {
+            return remainingReservation;
+        }
+
+        /**
+         *
+         * @return The number of bytes allocated from the Connection flow
+         *         control window but not yet written
+         */
+        public int getUnusedAllocation() {
+            return unusedAllocation;
+        }
+
+        /**
+         * The purpose of this is to avoid the incorrect triggering of a timeout
+         * for the following sequence of events:
+         * <ol>
+         * <li>window update 1</li>
+         * <li>allocation 1</li>
+         * <li>notify 1</li>
+         * <li>window update 2</li>
+         * <li>allocation 2</li>
+         * <li>act on notify 1 (using allocation 1 and 2)</li>
+         * <li>notify 2</li>
+         * <li>act on notify 2 (timeout due to no allocation)</li>
+         * </ol>
+         *
+         * @return {@code true} if a notify has been issued but the associated
+         *         allocation has not been used, otherwise {@code false}
+         */
+        public boolean isNotifyInProgress() {
+            return notifyInProgress;
+        }
+
+        public void useAllocation() {
+            unusedAllocation = 0;
+            notifyInProgress = false;
+        }
+
+        public void startNotify() {
+            notifyInProgress = true;
+        }
+
+        private int allocate(int allocation) {
+            if (remainingReservation >= allocation) {
+                remainingReservation -= allocation;
+                unusedAllocation += allocation;
+                return 0;
+            }
+
+            int left = allocation - remainingReservation;
+            unusedAllocation += remainingReservation;
+            remainingReservation = 0;
+
+            return left;
+        }
+    }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org