You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2015/05/21 22:47:05 UTC
svn commit: r1680952 - in /tomcat/trunk/java/org/apache/coyote/http2:
AbstractStream.java Http2UpgradeHandler.java Stream.java
Author: markt
Date: Thu May 21 20:47:04 2015
New Revision: 1680952
URL: http://svn.apache.org/r1680952
Log:
Implement flow control if the connection runs out of capacity.
Needs some unit tests (once I figure out the best way to write them).
Modified:
tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java
tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java
tomcat/trunk/java/org/apache/coyote/http2/Stream.java
Modified: tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java?rev=1680952&r1=1680951&r2=1680952&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java Thu May 21 20:47:04 2015
@@ -17,25 +17,20 @@
package org.apache.coyote.http2;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.juli.logging.Log;
-import org.apache.tomcat.util.res.StringManager;
/**
* Used to managed prioritisation.
*/
abstract class AbstractStream {
- private static final StringManager sm = StringManager.getManager(AbstractStream.class);
-
private final Integer identifier;
private volatile AbstractStream parentStream = null;
private final Set<AbstractStream> childStreams = new HashSet<>();
- private volatile int weight = Constants.DEFAULT_WEIGHT;
private AtomicLong windowSize = new AtomicLong(ConnectionSettings.DEFAULT_WINDOW_SIZE);
public Integer getIdentifier() {
@@ -48,34 +43,6 @@ abstract class AbstractStream {
}
- public void rePrioritise(AbstractStream parent, boolean exclusive, int weight) {
- if (getLog().isDebugEnabled()) {
- getLog().debug(sm.getString("abstractStream.reprioritisation.debug",
- Long.toString(getConnectionId()), identifier, Boolean.toString(exclusive),
- parent.getIdentifier(), Integer.toString(weight)));
- }
-
- // Check if new parent is a descendant of this stream
- if (isDescendant(parent)) {
- parent.detachFromParent();
- parentStream.addChild(parent);
- }
-
- if (exclusive) {
- // Need to move children of the new parent to be children of this
- // stream. Slightly convoluted to avoid concurrent modification.
- Iterator<AbstractStream> parentsChildren = parent.getChildStreams().iterator();
- while (parentsChildren.hasNext()) {
- AbstractStream parentsChild = parentsChildren.next();
- parentsChildren.remove();
- this.addChild(parentsChild);
- }
- }
- parent.addChild(this);
- this.weight = weight;
- }
-
-
void detachFromParent() {
if (parentStream != null) {
parentStream.getChildStreams().remove(this);
@@ -138,17 +105,9 @@ abstract class AbstractStream {
}
- protected int reserveWindowSize(int reservation) {
- long windowSize = this.windowSize.get();
- if (reservation > windowSize) {
- return (int) windowSize;
- } else {
- return reservation;
- }
- }
-
-
protected abstract Log getLog();
protected abstract int getConnectionId();
+
+ protected abstract int getWeight();
}
Modified: tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java?rev=1680952&r1=1680951&r2=1680952&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java Thu May 21 20:47:04 2015
@@ -21,9 +21,12 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.http.WebConnection;
@@ -101,7 +104,11 @@ public class Http2UpgradeHandler extends
private final Map<Integer,Stream> streams = new HashMap<>();
- private final Queue<Object> writeQueue = new ConcurrentLinkedQueue<>();
+ // Tracking for when the connection is blocked (windowSize < 1)
+ private final Object backLogLock = new Object();
+ private final Map<AbstractStream,int[]> backLogStreams = new ConcurrentHashMap<>();
+ private long backLogSize = 0;
+
public Http2UpgradeHandler(Adapter adapter) {
super (STREAM_ID_ZERO);
@@ -716,7 +723,6 @@ public class Http2UpgradeHandler extends
stream.getIdentifier(), Integer.toString(data.remaining())));
}
synchronized (socketWrapper) {
- // TODO Manage window sizes
byte[] header = new byte[9];
ByteUtil.setThreeBytes(header, 0, len);
header[3] = FRAME_TYPE_DATA;
@@ -737,28 +743,137 @@ public class Http2UpgradeHandler extends
socketWrapper.registerWriteInterest();
return;
}
+ }
- Object obj;
- while ((obj = getThingToWrite()) != null) {
- // TODO
- log.debug("TODO: write [" + obj.toString() + "]");
+
+ int reserveWindowSize(Stream stream, int toWrite) {
+ int result;
+ synchronized (backLogLock) {
+ long windowSize = getWindowSize();
+ if (windowSize < 1 || backLogSize > 0) {
+ // Has this stream been granted an allocation
+ int[] value = backLogStreams.remove(stream);
+ if (value[1] > 0) {
+ result = value[1];
+ value[0] = 0;
+ value[1] = 1;
+ } else {
+ value = new int[] { toWrite, 0 };
+ backLogStreams.put(stream, value);
+ backLogSize += toWrite;
+ // Add the parents as well
+ AbstractStream parent = stream.getParentStream();
+ while (parent != null && backLogStreams.putIfAbsent(parent, new int[2]) == null) {
+ parent = parent.getParentStream();
+ }
+ result = 0;
+ }
+ } else if (windowSize < toWrite) {
+ result = (int) windowSize;
+ } else {
+ result = toWrite;
+ }
+ incrementWindowSize(-result);
}
+ return result;
}
- private Object getThingToWrite() {
- // TODO This is more complicated than pulling an object off a queue.
- // Note: The checking of the queue for something to write and the
- // calling of endWrite() if nothing is found must be kept
- // within the same sync to avoid race conditions with adding
- // entries to the queue.
- return writeQueue.poll();
+ @Override
+ protected void incrementWindowSize(int increment) {
+ synchronized (backLogLock) {
+ if (getWindowSize() == 0) {
+ releaseBackLog(increment);
+ }
+ super.incrementWindowSize(increment);
+ }
+ }
+
+
+ private void releaseBackLog(int increment) {
+ if (backLogSize < increment) {
+ // Can clear the whole backlog
+ for (AbstractStream stream : backLogStreams.keySet()) {
+ synchronized (stream) {
+ stream.notifyAll();
+ }
+ }
+ backLogStreams.clear();
+ backLogSize = 0;
+ } else {
+ int leftToAllocate = increment;
+ while (leftToAllocate > 0) {
+ leftToAllocate = allocate(this, leftToAllocate);
+ }
+ allocate(this, increment);
+ for (Entry<AbstractStream,int[]> entry : backLogStreams.entrySet()) {
+ int allocation = entry.getValue()[1];
+ if (allocation > 0) {
+ backLogSize =- allocation;
+ synchronized (entry.getKey()) {
+ entry.getKey().notifyAll();
+ }
+ }
+ }
+ }
}
- void addWrite(Object obj) {
- writeQueue.add(obj);
+ private int allocate(AbstractStream stream, int allocation) {
+ // Allocate to the specified stream
+ int[] value = backLogStreams.get(stream);
+ if (value[0] >= allocation) {
+ value[0] -= allocation;
+ value[1] = allocation;
+ return 0;
+ }
+
+ // There was some left over so allocate that to the children of the
+ // stream.
+ int leftToAllocate = allocation;
+ value[1] = value[0];
+ value[0] = 0;
+ leftToAllocate -= value[1];
+
+ // Recipients are children of the current stream that are in the
+ // backlog.
+ Set<AbstractStream> recipients = new HashSet<>();
+ recipients.addAll(stream.getChildStreams());
+ recipients.retainAll(backLogStreams.keySet());
+
+ // Loop until we run out of allocation or recipients
+ while (leftToAllocate > 0) {
+ if (recipients.size() == 0) {
+ backLogStreams.remove(stream);
+ return leftToAllocate;
+ }
+
+ int totalWeight = 0;
+ for (AbstractStream recipient : recipients) {
+ totalWeight += recipient.getWeight();
+ }
+
+ // Use an Iterator so fully allocated children/recipients can be
+ // removed.
+ Iterator<AbstractStream> iter = recipients.iterator();
+ while (iter.hasNext()) {
+ AbstractStream recipient = iter.next();
+ // +1 is to avoid rounding issues triggering an infinite loop.
+ // Will cause a very slight over allocation but HTTP/2 should
+ // cope with that.
+ int share = 1 + leftToAllocate * recipient.getWeight() / totalWeight;
+ int remainder = allocate(recipient, share);
+ // Remove recipients that receive their full allocation so that
+ // they are excluded from the next allocation round.
+ if (remainder > 0) {
+ iter.remove();
+ }
+ leftToAllocate -= (share - remainder);
+ }
+ }
+
+ return 0;
}
@@ -801,4 +916,10 @@ public class Http2UpgradeHandler extends
protected final Log getLog() {
return log;
}
+
+
+ @Override
+ protected final int getWeight() {
+ return 0;
+ }
}
Modified: tomcat/trunk/java/org/apache/coyote/http2/Stream.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Stream.java?rev=1680952&r1=1680951&r2=1680952&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/Stream.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http2/Stream.java Thu May 21 20:47:04 2015
@@ -18,6 +18,7 @@ package org.apache.coyote.http2;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Iterator;
import org.apache.coyote.OutputBuffer;
import org.apache.coyote.Request;
@@ -33,6 +34,8 @@ public class Stream extends AbstractStre
private static final Log log = LogFactory.getLog(Stream.class);
private static final StringManager sm = StringManager.getManager(Stream.class);
+ private volatile int weight = Constants.DEFAULT_WEIGHT;
+
private final Http2UpgradeHandler handler;
private final Request coyoteRequest = new Request();
private final Response coyoteResponse = new Response();
@@ -48,6 +51,34 @@ public class Stream extends AbstractStre
}
+ public void rePrioritise(AbstractStream parent, boolean exclusive, int weight) {
+ if (getLog().isDebugEnabled()) {
+ getLog().debug(sm.getString("abstractStream.reprioritisation.debug",
+ Long.toString(getConnectionId()), getIdentifier(), Boolean.toString(exclusive),
+ parent.getIdentifier(), Integer.toString(weight)));
+ }
+
+ // Check if new parent is a descendant of this stream
+ if (isDescendant(parent)) {
+ parent.detachFromParent();
+ getParentStream().addChild(parent);
+ }
+
+ if (exclusive) {
+ // Need to move children of the new parent to be children of this
+ // stream. Slightly convoluted to avoid concurrent modification.
+ Iterator<AbstractStream> parentsChildren = parent.getChildStreams().iterator();
+ while (parentsChildren.hasNext()) {
+ AbstractStream parentsChild = parentsChildren.next();
+ parentsChildren.remove();
+ this.addChild(parentsChild);
+ }
+ }
+ parent.addChild(this);
+ this.weight = weight;
+ }
+
+
@Override
public void incrementWindowSize(int windowSizeIncrement) {
// If this is zero then any thread that has been trying to write for
@@ -64,6 +95,16 @@ public class Stream extends AbstractStre
}
+ private int reserveWindowSize(int reservation) {
+ long windowSize = getWindowSize();
+ if (reservation > windowSize) {
+ return (int) windowSize;
+ } else {
+ return reservation;
+ }
+ }
+
+
@Override
public void emitHeader(String name, String value, boolean neverIndex) {
if (log.isDebugEnabled()) {
@@ -142,6 +183,12 @@ public class Stream extends AbstractStre
}
+ @Override
+ protected int getWeight() {
+ return weight;
+ }
+
+
Request getCoyoteRequest() {
return coyoteRequest;
}
@@ -201,12 +248,12 @@ public class Stream extends AbstractStre
thisWriteStream = reserveWindowSize(left);
if (thisWriteStream < 1) {
// Need to block until a WindowUpdate message is
- // processed for this stream;
+ // processed for this stream
synchronized (this) {
try {
wait();
} catch (InterruptedException e) {
- // TODO. Possible shutdown?
+ // TODO: Possible shutdown?
}
}
}
@@ -215,14 +262,21 @@ public class Stream extends AbstractStre
// Flow control for the connection
int thisWrite;
do {
- thisWrite = handler.reserveWindowSize(thisWriteStream);
+ thisWrite = handler.reserveWindowSize(Stream.this, thisWriteStream);
if (thisWrite < 1) {
- // TODO Flow control when connection window is exhausted
+ // Need to block until a WindowUpdate message is
+ // processed for this connection
+ synchronized (this) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ // TODO: Possible shutdown?
+ }
+ }
}
} while (thisWrite < 1);
incrementWindowSize(-thisWrite);
- handler.incrementWindowSize(-thisWrite);
// Do the write
handler.writeBody(Stream.this, buffer, thisWrite);
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org