You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2021/09/13 16:29:24 UTC
[tomcat] 02/02: Revert "Step 1 - merge BackLogTracker into
AbstractStream"
This is an automated email from the ASF dual-hosted git repository.
markt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tomcat.git
commit aca188a85b607f5fad3457619ed453ba0b7180a7
Author: Mark Thomas <ma...@apache.org>
AuthorDate: Mon Sep 13 17:28:16 2021 +0100
Revert "Step 1 - merge BackLogTracker into AbstractStream"
This reverts commit 74681f56475541ff6069aadb1a88bd4d687e7446.
---
java/org/apache/coyote/http2/AbstractStream.java | 65 -----------
.../apache/coyote/http2/Http2UpgradeHandler.java | 128 +++++++++++++++++----
2 files changed, 107 insertions(+), 86 deletions(-)
diff --git a/java/org/apache/coyote/http2/AbstractStream.java b/java/org/apache/coyote/http2/AbstractStream.java
index d3195eb..c7374b6 100644
--- a/java/org/apache/coyote/http2/AbstractStream.java
+++ b/java/org/apache/coyote/http2/AbstractStream.java
@@ -40,9 +40,6 @@ abstract class AbstractStream {
private final Set<AbstractNonZeroStream> childStreams = Collections.newSetFromMap(new ConcurrentHashMap<>());
private long windowSize = ConnectionSettingsBase.DEFAULT_INITIAL_WINDOW_SIZE;
- volatile int remainingReservation;
- volatile int unusedAllocation;
- volatile boolean notifyInProgress;
AbstractStream(Integer identifier) {
this.identifier = identifier;
@@ -160,66 +157,4 @@ abstract class AbstractStream {
abstract String getConnectionId();
abstract int getWeight();
-
-
- /**
- * @return The number of bytes requiring an allocation from the
- * Connection flow control window
- */
- public int getRemainingReservation() {
- return remainingReservation;
- }
-
- /**
- *
- * @return The number of bytes allocated from the Connection flow
- * control window but not yet written
- */
- public int getUnusedAllocation() {
- return unusedAllocation;
- }
-
- /**
- * The purpose of this is to avoid the incorrect triggering of a timeout
- * for the following sequence of events:
- * <ol>
- * <li>window update 1</li>
- * <li>allocation 1</li>
- * <li>notify 1</li>
- * <li>window update 2</li>
- * <li>allocation 2</li>
- * <li>act on notify 1 (using allocation 1 and 2)</li>
- * <li>notify 2</li>
- * <li>act on notify 2 (timeout due to no allocation)</li>
- * </ol>
- *
- * @return {@code true} if a notify has been issued but the associated
- * allocation has not been used, otherwise {@code false}
- */
- public boolean isNotifyInProgress() {
- return notifyInProgress;
- }
-
- public void useAllocation() {
- unusedAllocation = 0;
- notifyInProgress = false;
- }
-
- public void startNotify() {
- notifyInProgress = true;
- }
-
- protected int allocate(int allocation) {
- if (remainingReservation >= allocation) {
- remainingReservation -= allocation;
- unusedAllocation += allocation;
- return 0;
- }
-
- int left = allocation - remainingReservation;
- unusedAllocation += remainingReservation;
- remainingReservation = 0;
-
- return left;
- }
}
diff --git a/java/org/apache/coyote/http2/Http2UpgradeHandler.java b/java/org/apache/coyote/http2/Http2UpgradeHandler.java
index b45dad0..f28ae6d 100644
--- a/java/org/apache/coyote/http2/Http2UpgradeHandler.java
+++ b/java/org/apache/coyote/http2/Http2UpgradeHandler.java
@@ -20,9 +20,10 @@ import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
-import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
import java.util.Queue;
import java.util.Set;
import java.util.TreeSet;
@@ -131,7 +132,7 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH
private final AtomicInteger nextLocalStreamId = new AtomicInteger(2);
private final PingManager pingManager = getPingManager();
private volatile int newStreamsSinceLastPrune = 0;
- private final Set<AbstractStream> backLogStreams = Collections.newSetFromMap(new ConcurrentHashMap<>());
+ private final Map<AbstractStream, BacklogTracker> backLogStreams = new ConcurrentHashMap<>();
private long backLogSize = 0;
// The time at which the connection will timeout unless data arrives before
// then. -1 means no timeout.
@@ -881,20 +882,21 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH
long windowSize = getWindowSize();
if (windowSize < 1 || backLogSize > 0) {
// Has this stream been granted an allocation
- if (stream.remainingReservation == 0) {
- backLogStreams.add(stream);
- stream.remainingReservation = reservation;
+ BacklogTracker tracker = backLogStreams.get(stream);
+ if (tracker == null) {
+ tracker = new BacklogTracker(reservation);
+ backLogStreams.put(stream, tracker);
backLogSize += reservation;
// Add the parents as well
AbstractStream parent = stream.getParentStream();
- while (parent != null && backLogStreams.add(parent)) {
+ while (parent != null && backLogStreams.putIfAbsent(parent, new BacklogTracker()) == null) {
parent = parent.getParentStream();
}
} else {
- if (stream.getUnusedAllocation() > 0) {
- allocation = stream.getUnusedAllocation();
+ if (tracker.getUnusedAllocation() > 0) {
+ allocation = tracker.getUnusedAllocation();
decrementWindowSize(allocation);
- if (stream.getRemainingReservation() == 0) {
+ if (tracker.getRemainingReservation() == 0) {
// The reservation has been fully allocated
// so this stream can be removed from the
// backlog.
@@ -903,7 +905,7 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH
// This allocation has been used. Leave the
// stream on the backlog as it still has
// more bytes to write.
- stream.useAllocation();
+ tracker.useAllocation();
}
}
}
@@ -926,7 +928,12 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH
// Has this stream been granted an allocation
// Note: If the stream in not in this Map then the
// requested write has been fully allocated
- if (stream.getUnusedAllocation() == 0) {
+ BacklogTracker tracker;
+ // Ensure allocations made in other threads are visible
+ synchronized (this) {
+ tracker = backLogStreams.get(stream);
+ }
+ if (tracker != null && tracker.getUnusedAllocation() == 0) {
String msg;
Http2Error error;
if (stream.isActive()) {
@@ -1026,7 +1033,7 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH
Set<AbstractStream> result = new HashSet<>();
if (backLogSize < increment) {
// Can clear the whole backlog
- result.addAll(backLogStreams);
+ result.addAll(backLogStreams.keySet());
backLogStreams.clear();
backLogSize = 0;
} else {
@@ -1034,13 +1041,13 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH
while (leftToAllocate > 0) {
leftToAllocate = allocate(this, leftToAllocate);
}
- for (AbstractStream stream : backLogStreams) {
- int allocation = stream.getUnusedAllocation();
+ for (Entry<AbstractStream,BacklogTracker> entry : backLogStreams.entrySet()) {
+ int allocation = entry.getValue().getUnusedAllocation();
if (allocation > 0) {
backLogSize -= allocation;
- if (!stream.isNotifyInProgress()) {
- result.add(stream);
- stream.startNotify();
+ if (!entry.getValue().isNotifyInProgress()) {
+ result.add(entry.getKey());
+ entry.getValue().startNotify();
}
}
}
@@ -1054,8 +1061,10 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH
log.debug(sm.getString("upgradeHandler.allocate.debug", getConnectionId(),
stream.getIdAsString(), Integer.toString(allocation)));
}
+ // Allocate to the specified stream
+ BacklogTracker tracker = backLogStreams.get(stream);
- int leftToAllocate = stream.allocate(allocation);
+ int leftToAllocate = tracker.allocate(allocation);
if (leftToAllocate == 0) {
return 0;
@@ -1069,12 +1078,12 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH
// Recipients are children of the current stream that are in the
// backlog.
Set<AbstractStream> recipients = new HashSet<>(stream.getChildStreams());
- recipients.retainAll(backLogStreams);
+ recipients.retainAll(backLogStreams.keySet());
// Loop until we run out of allocation or recipients
while (leftToAllocate > 0) {
if (recipients.size() == 0) {
- if (stream.getUnusedAllocation() == 0) {
+ if (tracker.getUnusedAllocation() == 0) {
backLogStreams.remove(stream);
}
return leftToAllocate;
@@ -1823,7 +1832,8 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH
if (average < overheadThreshold) {
// For Streams, client might only release the minimum so check
// against current demand
- if (increment < stream.getRemainingReservation()) {
+ BacklogTracker tracker = backLogStreams.get(stream);
+ if (tracker == null || increment < tracker.getRemainingReservation()) {
// The smaller the increment, the larger the overhead
increaseOverheadCount(FrameType.WINDOW_UPDATE, overheadThreshold / average);
}
@@ -2034,4 +2044,80 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH
payload = ByteBuffer.allocate(payload.capacity() * 2);
}
}
+
+
+ private static class BacklogTracker {
+
+ private int remainingReservation;
+ private int unusedAllocation;
+ private boolean notifyInProgress;
+
+ public BacklogTracker() {
+ }
+
+ public BacklogTracker(int reservation) {
+ remainingReservation = reservation;
+ }
+
+ /**
+ * @return The number of bytes requiring an allocation from the
+ * Connection flow control window
+ */
+ public int getRemainingReservation() {
+ return remainingReservation;
+ }
+
+ /**
+ *
+ * @return The number of bytes allocated from the Connection flow
+ * control window but not yet written
+ */
+ public int getUnusedAllocation() {
+ return unusedAllocation;
+ }
+
+ /**
+ * The purpose of this is to avoid the incorrect triggering of a timeout
+ * for the following sequence of events:
+ * <ol>
+ * <li>window update 1</li>
+ * <li>allocation 1</li>
+ * <li>notify 1</li>
+ * <li>window update 2</li>
+ * <li>allocation 2</li>
+ * <li>act on notify 1 (using allocation 1 and 2)</li>
+ * <li>notify 2</li>
+ * <li>act on notify 2 (timeout due to no allocation)</li>
+ * </ol>
+ *
+ * @return {@code true} if a notify has been issued but the associated
+ * allocation has not been used, otherwise {@code false}
+ */
+ public boolean isNotifyInProgress() {
+ return notifyInProgress;
+ }
+
+ public void useAllocation() {
+ unusedAllocation = 0;
+ notifyInProgress = false;
+ }
+
+ public void startNotify() {
+ notifyInProgress = true;
+ }
+
+ private int allocate(int allocation) {
+ if (remainingReservation >= allocation) {
+ remainingReservation -= allocation;
+ unusedAllocation += allocation;
+ return 0;
+ }
+
+ int left = allocation - remainingReservation;
+ unusedAllocation += remainingReservation;
+ remainingReservation = 0;
+
+ return left;
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org