You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2015/05/21 22:47:05 UTC

svn commit: r1680952 - in /tomcat/trunk/java/org/apache/coyote/http2: AbstractStream.java Http2UpgradeHandler.java Stream.java

Author: markt
Date: Thu May 21 20:47:04 2015
New Revision: 1680952

URL: http://svn.apache.org/r1680952
Log:
Implement flow control if the connection runs out of capacity.
Needs some unit tests (once I figure out the best way to write them).

Modified:
    tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java
    tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java
    tomcat/trunk/java/org/apache/coyote/http2/Stream.java

Modified: tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java?rev=1680952&r1=1680951&r2=1680952&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java Thu May 21 20:47:04 2015
@@ -17,25 +17,20 @@
 package org.apache.coyote.http2;
 
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.juli.logging.Log;
-import org.apache.tomcat.util.res.StringManager;
 
 /**
  * Used to managed prioritisation.
  */
 abstract class AbstractStream {
 
-    private static final StringManager sm = StringManager.getManager(AbstractStream.class);
-
     private final Integer identifier;
 
     private volatile AbstractStream parentStream = null;
     private final Set<AbstractStream> childStreams = new HashSet<>();
-    private volatile int weight = Constants.DEFAULT_WEIGHT;
     private AtomicLong windowSize = new AtomicLong(ConnectionSettings.DEFAULT_WINDOW_SIZE);
 
     public Integer getIdentifier() {
@@ -48,34 +43,6 @@ abstract class AbstractStream {
     }
 
 
-    public void rePrioritise(AbstractStream parent, boolean exclusive, int weight) {
-        if (getLog().isDebugEnabled()) {
-            getLog().debug(sm.getString("abstractStream.reprioritisation.debug",
-                    Long.toString(getConnectionId()), identifier, Boolean.toString(exclusive),
-                    parent.getIdentifier(), Integer.toString(weight)));
-        }
-
-        // Check if new parent is a descendant of this stream
-        if (isDescendant(parent)) {
-            parent.detachFromParent();
-            parentStream.addChild(parent);
-        }
-
-        if (exclusive) {
-            // Need to move children of the new parent to be children of this
-            // stream. Slightly convoluted to avoid concurrent modification.
-            Iterator<AbstractStream> parentsChildren = parent.getChildStreams().iterator();
-            while (parentsChildren.hasNext()) {
-                AbstractStream parentsChild = parentsChildren.next();
-                parentsChildren.remove();
-                this.addChild(parentsChild);
-            }
-        }
-        parent.addChild(this);
-        this.weight = weight;
-    }
-
-
     void detachFromParent() {
         if (parentStream != null) {
             parentStream.getChildStreams().remove(this);
@@ -138,17 +105,9 @@ abstract class AbstractStream {
     }
 
 
-    protected int reserveWindowSize(int reservation) {
-        long windowSize = this.windowSize.get();
-        if (reservation > windowSize) {
-            return (int) windowSize;
-        } else {
-            return reservation;
-        }
-    }
-
-
     protected abstract Log getLog();
 
     protected abstract int getConnectionId();
+
+    protected abstract int getWeight();
 }

Modified: tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java?rev=1680952&r1=1680951&r2=1680952&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java Thu May 21 20:47:04 2015
@@ -21,9 +21,12 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.servlet.http.WebConnection;
@@ -101,7 +104,11 @@ public class Http2UpgradeHandler extends
 
     private final Map<Integer,Stream> streams = new HashMap<>();
 
-    private final Queue<Object> writeQueue = new ConcurrentLinkedQueue<>();
+    // Tracking for when the connection is blocked (windowSize < 1)
+    private final Object backLogLock = new Object();
+    private final Map<AbstractStream,int[]> backLogStreams = new ConcurrentHashMap<>();
+    private long backLogSize = 0;
+
 
     public Http2UpgradeHandler(Adapter adapter) {
         super (STREAM_ID_ZERO);
@@ -716,7 +723,6 @@ public class Http2UpgradeHandler extends
                     stream.getIdentifier(), Integer.toString(data.remaining())));
         }
         synchronized (socketWrapper) {
-            // TODO Manage window sizes
             byte[] header = new byte[9];
             ByteUtil.setThreeBytes(header, 0, len);
             header[3] = FRAME_TYPE_DATA;
@@ -737,28 +743,137 @@ public class Http2UpgradeHandler extends
             socketWrapper.registerWriteInterest();
             return;
         }
+    }
 
-        Object obj;
-        while ((obj = getThingToWrite()) != null) {
-            // TODO
-            log.debug("TODO: write [" + obj.toString() + "]");
+
+    int reserveWindowSize(Stream stream, int toWrite) {
+        int result;
+        synchronized (backLogLock) {
+            long windowSize = getWindowSize();
+            if (windowSize < 1 || backLogSize > 0) {
+                // Has this stream been granted an allocation
+                int[] value = backLogStreams.remove(stream);
+                if (value[1] > 0) {
+                    result = value[1];
+                    value[0] = 0;
+                    value[1] = 1;
+                } else {
+                    value = new int[] { toWrite, 0 };
+                    backLogStreams.put(stream, value);
+                    backLogSize += toWrite;
+                    // Add the parents as well
+                    AbstractStream parent = stream.getParentStream();
+                    while (parent != null && backLogStreams.putIfAbsent(parent, new int[2]) == null) {
+                        parent = parent.getParentStream();
+                    }
+                    result = 0;
+                }
+            } else if (windowSize < toWrite) {
+                result = (int) windowSize;
+            } else {
+                result = toWrite;
+            }
+            incrementWindowSize(-result);
         }
+        return result;
     }
 
 
-    private Object getThingToWrite() {
-        // TODO This is more complicated than pulling an object off a queue.
 
-        // Note: The checking of the queue for something to write and the
-        //       calling of endWrite() if nothing is found must be kept
-        //       within the same sync to avoid race conditions with adding
-        //       entries to the queue.
-        return writeQueue.poll();
+    @Override
+    protected void incrementWindowSize(int increment) {
+        synchronized (backLogLock) {
+            if (getWindowSize() == 0) {
+                releaseBackLog(increment);
+            }
+            super.incrementWindowSize(increment);
+        }
+    }
+
+
+    private void releaseBackLog(int increment) {
+        if (backLogSize < increment) {
+            // Can clear the whole backlog
+            for (AbstractStream stream : backLogStreams.keySet()) {
+                synchronized (stream) {
+                    stream.notifyAll();
+                }
+            }
+            backLogStreams.clear();
+            backLogSize = 0;
+        } else {
+            int leftToAllocate = increment;
+            while (leftToAllocate > 0) {
+                leftToAllocate = allocate(this, leftToAllocate);
+            }
+            allocate(this, increment);
+            for (Entry<AbstractStream,int[]> entry : backLogStreams.entrySet()) {
+                int allocation = entry.getValue()[1];
+                if (allocation > 0) {
+                    backLogSize =- allocation;
+                    synchronized (entry.getKey()) {
+                        entry.getKey().notifyAll();
+                    }
+                }
+            }
+        }
     }
 
 
-    void addWrite(Object obj) {
-        writeQueue.add(obj);
+    private int allocate(AbstractStream stream, int allocation) {
+        // Allocate to the specified stream
+        int[] value = backLogStreams.get(stream);
+        if (value[0] >= allocation) {
+            value[0] -= allocation;
+            value[1] = allocation;
+            return 0;
+        }
+
+        // There was some left over so allocate that to the children of the
+        // stream.
+        int leftToAllocate = allocation;
+        value[1] = value[0];
+        value[0] = 0;
+        leftToAllocate -= value[1];
+
+        // Recipients are children of the current stream that are in the
+        // backlog.
+        Set<AbstractStream> recipients = new HashSet<>();
+        recipients.addAll(stream.getChildStreams());
+        recipients.retainAll(backLogStreams.keySet());
+
+        // Loop until we run out of allocation or recipients
+        while (leftToAllocate > 0) {
+            if (recipients.size() == 0) {
+                backLogStreams.remove(stream);
+                return leftToAllocate;
+            }
+
+            int totalWeight = 0;
+            for (AbstractStream recipient : recipients) {
+                totalWeight += recipient.getWeight();
+            }
+
+            // Use an Iterator so fully allocated children/recipients can be
+            // removed.
+            Iterator<AbstractStream> iter = recipients.iterator();
+            while (iter.hasNext()) {
+                AbstractStream recipient = iter.next();
+                // +1 is to avoid rounding issues triggering an infinite loop.
+                // Will cause a very slight over allocation but HTTP/2 should
+                // cope with that.
+                int share = 1 + leftToAllocate * recipient.getWeight() / totalWeight;
+                int remainder = allocate(recipient, share);
+                // Remove recipients that receive their full allocation so that
+                // they are excluded from the next allocation round.
+                if (remainder > 0) {
+                    iter.remove();
+                }
+                leftToAllocate -= (share - remainder);
+            }
+        }
+
+        return 0;
     }
 
 
@@ -801,4 +916,10 @@ public class Http2UpgradeHandler extends
     protected final Log getLog() {
         return log;
     }
+
+
+    @Override
+    protected final int getWeight() {
+        return 0;
+    }
 }

Modified: tomcat/trunk/java/org/apache/coyote/http2/Stream.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Stream.java?rev=1680952&r1=1680951&r2=1680952&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/Stream.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http2/Stream.java Thu May 21 20:47:04 2015
@@ -18,6 +18,7 @@ package org.apache.coyote.http2;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Iterator;
 
 import org.apache.coyote.OutputBuffer;
 import org.apache.coyote.Request;
@@ -33,6 +34,8 @@ public class Stream extends AbstractStre
     private static final Log log = LogFactory.getLog(Stream.class);
     private static final StringManager sm = StringManager.getManager(Stream.class);
 
+    private volatile int weight = Constants.DEFAULT_WEIGHT;
+
     private final Http2UpgradeHandler handler;
     private final Request coyoteRequest = new Request();
     private final Response coyoteResponse = new Response();
@@ -48,6 +51,34 @@ public class Stream extends AbstractStre
     }
 
 
+    public void rePrioritise(AbstractStream parent, boolean exclusive, int weight) {
+        if (getLog().isDebugEnabled()) {
+            getLog().debug(sm.getString("abstractStream.reprioritisation.debug",
+                    Long.toString(getConnectionId()), getIdentifier(), Boolean.toString(exclusive),
+                    parent.getIdentifier(), Integer.toString(weight)));
+        }
+
+        // Check if new parent is a descendant of this stream
+        if (isDescendant(parent)) {
+            parent.detachFromParent();
+            getParentStream().addChild(parent);
+        }
+
+        if (exclusive) {
+            // Need to move children of the new parent to be children of this
+            // stream. Slightly convoluted to avoid concurrent modification.
+            Iterator<AbstractStream> parentsChildren = parent.getChildStreams().iterator();
+            while (parentsChildren.hasNext()) {
+                AbstractStream parentsChild = parentsChildren.next();
+                parentsChildren.remove();
+                this.addChild(parentsChild);
+            }
+        }
+        parent.addChild(this);
+        this.weight = weight;
+    }
+
+
     @Override
     public void incrementWindowSize(int windowSizeIncrement) {
         // If this is zero then any thread that has been trying to write for
@@ -64,6 +95,16 @@ public class Stream extends AbstractStre
     }
 
 
+    private int reserveWindowSize(int reservation) {
+        long windowSize = getWindowSize();
+        if (reservation > windowSize) {
+            return (int) windowSize;
+        } else {
+            return reservation;
+        }
+    }
+
+
     @Override
     public void emitHeader(String name, String value, boolean neverIndex) {
         if (log.isDebugEnabled()) {
@@ -142,6 +183,12 @@ public class Stream extends AbstractStre
     }
 
 
+    @Override
+    protected int getWeight() {
+        return weight;
+    }
+
+
     Request getCoyoteRequest() {
         return coyoteRequest;
     }
@@ -201,12 +248,12 @@ public class Stream extends AbstractStre
                     thisWriteStream = reserveWindowSize(left);
                     if (thisWriteStream < 1) {
                         // Need to block until a WindowUpdate message is
-                        // processed for this stream;
+                        // processed for this stream
                         synchronized (this) {
                             try {
                                 wait();
                             } catch (InterruptedException e) {
-                                // TODO. Possible shutdown?
+                                // TODO: Possible shutdown?
                             }
                         }
                     }
@@ -215,14 +262,21 @@ public class Stream extends AbstractStre
                 // Flow control for the connection
                 int thisWrite;
                 do {
-                    thisWrite = handler.reserveWindowSize(thisWriteStream);
+                    thisWrite = handler.reserveWindowSize(Stream.this, thisWriteStream);
                     if (thisWrite < 1) {
-                        // TODO Flow control when connection window is exhausted
+                        // Need to block until a WindowUpdate message is
+                        // processed for this connection
+                        synchronized (this) {
+                            try {
+                                wait();
+                            } catch (InterruptedException e) {
+                                // TODO: Possible shutdown?
+                            }
+                        }
                     }
                 } while (thisWrite < 1);
 
                 incrementWindowSize(-thisWrite);
-                handler.incrementWindowSize(-thisWrite);
 
                 // Do the write
                 handler.writeBody(Stream.this, buffer, thisWrite);



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org