You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2021/09/13 16:05:10 UTC

[tomcat] 02/03: Step 1 - merge BackLogTracker into AbstractStream

This is an automated email from the ASF dual-hosted git repository.

markt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tomcat.git

commit 74681f56475541ff6069aadb1a88bd4d687e7446
Author: Mark Thomas <ma...@apache.org>
AuthorDate: Mon Sep 13 15:10:58 2021 +0100

    Step 1 - merge BackLogTracker into AbstractStream
---
 java/org/apache/coyote/http2/AbstractStream.java   |  65 +++++++++++
 .../apache/coyote/http2/Http2UpgradeHandler.java   | 128 ++++-----------------
 2 files changed, 86 insertions(+), 107 deletions(-)

diff --git a/java/org/apache/coyote/http2/AbstractStream.java b/java/org/apache/coyote/http2/AbstractStream.java
index c7374b6..d3195eb 100644
--- a/java/org/apache/coyote/http2/AbstractStream.java
+++ b/java/org/apache/coyote/http2/AbstractStream.java
@@ -40,6 +40,9 @@ abstract class AbstractStream {
     private final Set<AbstractNonZeroStream> childStreams = Collections.newSetFromMap(new ConcurrentHashMap<>());
     private long windowSize = ConnectionSettingsBase.DEFAULT_INITIAL_WINDOW_SIZE;
 
+    volatile int remainingReservation;
+    volatile int unusedAllocation;
+    volatile boolean notifyInProgress;
 
     AbstractStream(Integer identifier) {
         this.identifier = identifier;
@@ -157,4 +160,66 @@ abstract class AbstractStream {
     abstract String getConnectionId();
 
     abstract int getWeight();
+
+
+    /**
+     * @return The number of bytes requiring an allocation from the
+     *         Connection flow control window
+     */
+    public int getRemainingReservation() {
+        return remainingReservation;
+    }
+
+    /**
+     *
+     * @return The number of bytes allocated from the Connection flow
+     *         control window but not yet written
+     */
+    public int getUnusedAllocation() {
+        return unusedAllocation;
+    }
+
+    /**
+     * The purpose of this is to avoid the incorrect triggering of a timeout
+     * for the following sequence of events:
+     * <ol>
+     * <li>window update 1</li>
+     * <li>allocation 1</li>
+     * <li>notify 1</li>
+     * <li>window update 2</li>
+     * <li>allocation 2</li>
+     * <li>act on notify 1 (using allocation 1 and 2)</li>
+     * <li>notify 2</li>
+     * <li>act on notify 2 (timeout due to no allocation)</li>
+     * </ol>
+     *
+     * @return {@code true} if a notify has been issued but the associated
+     *         allocation has not been used, otherwise {@code false}
+     */
+    public boolean isNotifyInProgress() {
+        return notifyInProgress;
+    }
+
+    public void useAllocation() {
+        unusedAllocation = 0;
+        notifyInProgress = false;
+    }
+
+    public void startNotify() {
+        notifyInProgress = true;
+    }
+
+    protected int allocate(int allocation) {
+        if (remainingReservation >= allocation) {
+            remainingReservation -= allocation;
+            unusedAllocation += allocation;
+            return 0;
+        }
+
+        int left = allocation - remainingReservation;
+        unusedAllocation += remainingReservation;
+        remainingReservation = 0;
+
+        return left;
+    }
 }
diff --git a/java/org/apache/coyote/http2/Http2UpgradeHandler.java b/java/org/apache/coyote/http2/Http2UpgradeHandler.java
index f28ae6d..b45dad0 100644
--- a/java/org/apache/coyote/http2/Http2UpgradeHandler.java
+++ b/java/org/apache/coyote/http2/Http2UpgradeHandler.java
@@ -20,10 +20,9 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Queue;
 import java.util.Set;
 import java.util.TreeSet;
@@ -132,7 +131,7 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH
     private final AtomicInteger nextLocalStreamId = new AtomicInteger(2);
     private final PingManager pingManager = getPingManager();
     private volatile int newStreamsSinceLastPrune = 0;
-    private final Map<AbstractStream, BacklogTracker> backLogStreams = new ConcurrentHashMap<>();
+    private final Set<AbstractStream> backLogStreams = Collections.newSetFromMap(new ConcurrentHashMap<>());
     private long backLogSize = 0;
     // The time at which the connection will timeout unless data arrives before
     // then. -1 means no timeout.
@@ -882,21 +881,20 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH
                     long windowSize = getWindowSize();
                     if (windowSize < 1 || backLogSize > 0) {
                         // Has this stream been granted an allocation
-                        BacklogTracker tracker = backLogStreams.get(stream);
-                        if (tracker == null) {
-                            tracker = new BacklogTracker(reservation);
-                            backLogStreams.put(stream, tracker);
+                        if (stream.remainingReservation == 0) {
+                            backLogStreams.add(stream);
+                            stream.remainingReservation = reservation;
                             backLogSize += reservation;
                             // Add the parents as well
                             AbstractStream parent = stream.getParentStream();
-                            while (parent != null && backLogStreams.putIfAbsent(parent, new BacklogTracker()) == null) {
+                            while (parent != null && backLogStreams.add(parent)) {
                                 parent = parent.getParentStream();
                             }
                         } else {
-                            if (tracker.getUnusedAllocation() > 0) {
-                                allocation = tracker.getUnusedAllocation();
+                            if (stream.getUnusedAllocation() > 0) {
+                                allocation = stream.getUnusedAllocation();
                                 decrementWindowSize(allocation);
-                                if (tracker.getRemainingReservation() == 0) {
+                                if (stream.getRemainingReservation() == 0) {
                                     // The reservation has been fully allocated
                                     // so this stream can be removed from the
                                     // backlog.
@@ -905,7 +903,7 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH
                                     // This allocation has been used. Leave the
                                     // stream on the backlog as it still has
                                     // more bytes to write.
-                                    tracker.useAllocation();
+                                    stream.useAllocation();
                                 }
                             }
                         }
@@ -928,12 +926,7 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH
                             // Has this stream been granted an allocation
                             // Note: If the stream in not in this Map then the
                             //       requested write has been fully allocated
-                            BacklogTracker tracker;
-                            // Ensure allocations made in other threads are visible
-                            synchronized (this) {
-                                tracker = backLogStreams.get(stream);
-                            }
-                            if (tracker != null && tracker.getUnusedAllocation() == 0) {
+                            if (stream.getUnusedAllocation() == 0) {
                                 String msg;
                                 Http2Error error;
                                 if (stream.isActive()) {
@@ -1033,7 +1026,7 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH
         Set<AbstractStream> result = new HashSet<>();
         if (backLogSize < increment) {
             // Can clear the whole backlog
-            result.addAll(backLogStreams.keySet());
+            result.addAll(backLogStreams);
             backLogStreams.clear();
             backLogSize = 0;
         } else {
@@ -1041,13 +1034,13 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH
             while (leftToAllocate > 0) {
                 leftToAllocate = allocate(this, leftToAllocate);
             }
-            for (Entry<AbstractStream,BacklogTracker> entry : backLogStreams.entrySet()) {
-                int allocation = entry.getValue().getUnusedAllocation();
+            for (AbstractStream stream : backLogStreams) {
+                int allocation = stream.getUnusedAllocation();
                 if (allocation > 0) {
                     backLogSize -= allocation;
-                    if (!entry.getValue().isNotifyInProgress()) {
-                        result.add(entry.getKey());
-                        entry.getValue().startNotify();
+                    if (!stream.isNotifyInProgress()) {
+                        result.add(stream);
+                        stream.startNotify();
                     }
                 }
             }
@@ -1061,10 +1054,8 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH
             log.debug(sm.getString("upgradeHandler.allocate.debug", getConnectionId(),
                     stream.getIdAsString(), Integer.toString(allocation)));
         }
-        // Allocate to the specified stream
-        BacklogTracker tracker = backLogStreams.get(stream);
 
-        int leftToAllocate = tracker.allocate(allocation);
+        int leftToAllocate = stream.allocate(allocation);
 
         if (leftToAllocate == 0) {
             return 0;
@@ -1078,12 +1069,12 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH
         // Recipients are children of the current stream that are in the
         // backlog.
         Set<AbstractStream> recipients = new HashSet<>(stream.getChildStreams());
-        recipients.retainAll(backLogStreams.keySet());
+        recipients.retainAll(backLogStreams);
 
         // Loop until we run out of allocation or recipients
         while (leftToAllocate > 0) {
             if (recipients.size() == 0) {
-                if (tracker.getUnusedAllocation() == 0) {
+                if (stream.getUnusedAllocation() == 0) {
                     backLogStreams.remove(stream);
                 }
                 return leftToAllocate;
@@ -1832,8 +1823,7 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH
             if (average < overheadThreshold) {
                 // For Streams, client might only release the minimum so check
                 // against current demand
-                BacklogTracker tracker = backLogStreams.get(stream);
-                if (tracker == null || increment < tracker.getRemainingReservation()) {
+                if (increment < stream.getRemainingReservation()) {
                     // The smaller the increment, the larger the overhead
                     increaseOverheadCount(FrameType.WINDOW_UPDATE, overheadThreshold / average);
                 }
@@ -2044,80 +2034,4 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH
             payload = ByteBuffer.allocate(payload.capacity() * 2);
         }
     }
-
-
-    private static class BacklogTracker {
-
-        private int remainingReservation;
-        private int unusedAllocation;
-        private boolean notifyInProgress;
-
-        public BacklogTracker() {
-        }
-
-        public BacklogTracker(int reservation) {
-            remainingReservation = reservation;
-        }
-
-        /**
-         * @return The number of bytes requiring an allocation from the
-         *         Connection flow control window
-         */
-        public int getRemainingReservation() {
-            return remainingReservation;
-        }
-
-        /**
-         *
-         * @return The number of bytes allocated from the Connection flow
-         *         control window but not yet written
-         */
-        public int getUnusedAllocation() {
-            return unusedAllocation;
-        }
-
-        /**
-         * The purpose of this is to avoid the incorrect triggering of a timeout
-         * for the following sequence of events:
-         * <ol>
-         * <li>window update 1</li>
-         * <li>allocation 1</li>
-         * <li>notify 1</li>
-         * <li>window update 2</li>
-         * <li>allocation 2</li>
-         * <li>act on notify 1 (using allocation 1 and 2)</li>
-         * <li>notify 2</li>
-         * <li>act on notify 2 (timeout due to no allocation)</li>
-         * </ol>
-         *
-         * @return {@code true} if a notify has been issued but the associated
-         *         allocation has not been used, otherwise {@code false}
-         */
-        public boolean isNotifyInProgress() {
-            return notifyInProgress;
-        }
-
-        public void useAllocation() {
-            unusedAllocation = 0;
-            notifyInProgress = false;
-        }
-
-        public void startNotify() {
-            notifyInProgress = true;
-        }
-
-        private int allocate(int allocation) {
-            if (remainingReservation >= allocation) {
-                remainingReservation -= allocation;
-                unusedAllocation += allocation;
-                return 0;
-            }
-
-            int left = allocation - remainingReservation;
-            unusedAllocation += remainingReservation;
-            remainingReservation = 0;
-
-            return left;
-        }
-    }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org