You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by co...@apache.org on 2012/03/13 06:37:15 UTC

svn commit: r1299981 - in /tomcat/trunk/java/org/apache: coyote/spdy/ tomcat/spdy/

Author: costin
Date: Tue Mar 13 05:37:14 2012
New Revision: 1299981

URL: http://svn.apache.org/viewvc?rev=1299981&view=rev
Log:
Update the spdy implementation to use the non-blocking apr socket. 
Fix various bugs found while stress testing.


Added:
    tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContextJni.java
Modified:
    tomcat/trunk/java/org/apache/coyote/spdy/SpdyAprNpnHandler.java
    tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java
    tomcat/trunk/java/org/apache/tomcat/spdy/SpdyConnection.java
    tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContext.java
    tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContextProxy.java
    tomcat/trunk/java/org/apache/tomcat/spdy/SpdyFrame.java
    tomcat/trunk/java/org/apache/tomcat/spdy/SpdyStream.java

Modified: tomcat/trunk/java/org/apache/coyote/spdy/SpdyAprNpnHandler.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/spdy/SpdyAprNpnHandler.java?rev=1299981&r1=1299980&r2=1299981&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/spdy/SpdyAprNpnHandler.java (original)
+++ tomcat/trunk/java/org/apache/coyote/spdy/SpdyAprNpnHandler.java Tue Mar 13 05:37:14 2012
@@ -27,9 +27,11 @@ import org.apache.juli.logging.LogFactor
 import org.apache.tomcat.jni.Error;
 import org.apache.tomcat.jni.SSLExt;
 import org.apache.tomcat.jni.Status;
+import org.apache.tomcat.jni.socket.AprSocketContext;
 import org.apache.tomcat.spdy.CompressDeflater6;
 import org.apache.tomcat.spdy.SpdyConnection;
 import org.apache.tomcat.spdy.SpdyContext;
+import org.apache.tomcat.spdy.SpdyContextJni;
 import org.apache.tomcat.spdy.SpdyStream;
 import org.apache.tomcat.util.net.AbstractEndpoint;
 import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
@@ -65,7 +67,7 @@ public class SpdyAprNpnHandler implement
 
     private static final Log log = LogFactory.getLog(AprEndpoint.class);
 
-    private SpdyContext spdyContext;
+    private SpdyContextApr spdyContext;
 
     boolean ssl = true;
 
@@ -88,7 +90,7 @@ public class SpdyAprNpnHandler implement
     }
 
 
-    private final class SpdyContextApr extends SpdyContext {
+    private final class SpdyContextApr extends SpdyContextJni {
         private final AbstractEndpoint ep;
 
         private final Adapter adapter;
@@ -106,76 +108,6 @@ public class SpdyAprNpnHandler implement
         }
     }
 
-    public static class SpdyConnectionApr extends SpdyConnection {
-        long socket;
-
-        public SpdyConnectionApr(SocketWrapper<Long> socketW,
-                SpdyContext spdyContext, boolean ssl) {
-            super(spdyContext);
-            this.socket = socketW.getSocket().longValue();
-            if (ssl) {
-                setCompressSupport(new CompressDeflater6());
-            }
-        }
-
-        // TODO: write/read should go to SocketWrapper.
-        @Override
-        public int write(byte[] data, int off, int len) {
-            if (socket == 0 || inClosed) {
-                return -1;
-            }
-            int rem = len;
-            while (rem > 0) {
-                int sent = org.apache.tomcat.jni.Socket.send(socket, data, off,
-                        rem);
-                if (sent < 0) {
-                    inClosed = true;
-                    return -1;
-                }
-                if (sent == 0) {
-                    return len - rem;
-                }
-                rem -= sent;
-                off += sent;
-            }
-            return len;
-        }
-
-        /**
-         */
-        @Override
-        public int read(byte[] data, int off, int len) throws IOException {
-            if (socket == 0 || inClosed) {
-                return 0;
-            }
-            int rd = org.apache.tomcat.jni.Socket.recv(socket, data, off, len);
-            if (rd == -Status.APR_EOF) {
-                inClosed = true;
-                return -1;
-            }
-            if (rd == -Status.TIMEUP) {
-                rd = 0;
-            }
-            if (rd == -Status.EAGAIN) {
-                rd = 0;
-            }
-            if (rd < 0) {
-                // all other errors
-                inClosed = true;
-                throw new IOException("Error: " + rd + " "
-                        + Error.strerror((int) -rd));
-            }
-            off += rd;
-            len -= rd;
-            return rd;
-        }
-    }
-
-    // apr normally creates a new object on each poll.
-    // For 'upgraded' protocols we need to remember it's handled differently.
-    Map<Long, SpdyConnectionApr> lightProcessors =
-            new HashMap<Long, SpdyConnectionApr>();
-
     @Override
     public SocketState process(SocketWrapper<Long> socketO, SocketStatus status,
             Http11AprProtocol proto, AbstractEndpoint endpoint) {
@@ -183,61 +115,12 @@ public class SpdyAprNpnHandler implement
         SocketWrapper<Long> socketW = socketO;
         long socket = ((Long) socketW.getSocket()).longValue();
 
-        SpdyConnectionApr lh = lightProcessors.get(socket);
-        // Are we getting an HTTP request ?
-        if (lh == null && status != SocketStatus.OPEN) {
-            return null;
-        }
-
-        log.info("Status: " + status);
-
-        SocketState ss = null;
-        if (lh != null) {
-            // STOP, ERROR, DISCONNECT, TIMEOUT -> onClose
-            if (status == SocketStatus.TIMEOUT) {
-                // Called from maintain - we're removed from the poll
-                ((AprEndpoint) endpoint).getCometPoller().add(
-                        socketO.getSocket().longValue(), false);
-                return SocketState.LONG;
-            }
-            if (status == SocketStatus.STOP || status == SocketStatus.DISCONNECT ||
-                    status == SocketStatus.ERROR) {
-                SpdyConnectionApr wrapper = lightProcessors.remove(socket);
-                if (wrapper != null) {
-                    wrapper.onClose();
-                }
-                return SocketState.CLOSED;
-            }
-            int rc = lh.onBlockingSocket();
-            ss = (rc == SpdyConnection.LONG) ? SocketState.LONG
-                    : SocketState.CLOSED;
-        } else {
-            // OPEN, no existing socket
-            if (!ssl || SSLExt.checkNPN(socket, SpdyContext.SPDY_NPN)) {
-                // NPN negotiated or not ssl
-                lh = new SpdyConnectionApr(socketW, spdyContext, ssl);
-
-                int rc = lh.onBlockingSocket();
-                ss = (rc == SpdyConnection.LONG) ? SocketState.LONG
-                        : SocketState.CLOSED;
-                if (ss == SocketState.LONG) {
-                    lightProcessors.put(socketO.getSocket().longValue(), lh);
-                }
-            } else {
-                return null;
-            }
-        }
-
-        // OPEN is used for both 'first time' and 'new connection'
-        // In theory we shouldn't get another open while this is in
-        // progress ( only after we add back to the poller )
-
-        if (ss == SocketState.LONG) {
-            log.info("Long poll: " + status);
-            ((AprEndpoint) endpoint).getCometPoller().add(
-                    socketO.getSocket().longValue(), false);
+        try {
+            spdyContext.onAccept(socket);
+        } catch (IOException e) {
         }
-        return ss;
+        // No need to keep tomcat thread busy - but socket will be handled by apr socket context.
+        return SocketState.LONG;
     }
 
     public void onClose(SocketWrapper<Long> socketWrapper) {

Modified: tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java?rev=1299981&r1=1299980&r2=1299981&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java Tue Mar 13 05:37:14 2012
@@ -93,7 +93,7 @@ public class SpdyProcessor extends Abstr
         public int doRead(ByteChunk bchunk, Request request) throws IOException {
             if (inFrame == null) {
                 // blocking
-                inFrame = spdyStream.getIn(endpoint.getSoTimeout());
+                inFrame = spdyStream.getDataFrame(endpoint.getSoTimeout());
             }
             if (inFrame == null) {
                 return -1;
@@ -388,13 +388,6 @@ public class SpdyProcessor extends Abstr
 
     }
 
-    private static byte[] STATUS = "status".getBytes();
-
-    private static byte[] VERSION = "version".getBytes();
-
-    private static byte[] HTTP11 = "HTTP/1.1".getBytes();
-
-    private static byte[] OK200 = "200 OK".getBytes();
 
     /**
      * When committing the response, we have to validate the set of headers, as
@@ -424,8 +417,6 @@ public class SpdyProcessor extends Abstr
 
     private void sendResponseHead() throws IOException {
         SpdyFrame rframe = spdy.getFrame(SpdyConnection.TYPE_SYN_REPLY);
-        // TODO: is closed ?
-        rframe.streamId = spdyStream.reqFrame.streamId;
         rframe.associated = 0;
 
         MimeHeaders headers = response.getMimeHeaders();
@@ -444,10 +435,8 @@ public class SpdyProcessor extends Abstr
             bc = mb.getByteChunk();
             rframe.headerValue(bc.getBuffer(), bc.getStart(), bc.getLength());
         }
-        rframe.headerName(STATUS, 0, STATUS.length);
-
         if (response.getStatus() == 0) {
-            rframe.headerValue(OK200, 0, OK200.length);
+            rframe.addHeader(SpdyFrame.STATUS, SpdyFrame.OK200);
         } else {
             // HTTP header contents
             String message = null;
@@ -466,12 +455,13 @@ public class SpdyProcessor extends Abstr
             // TODO: optimize
             String status = response.getStatus() + " " + message;
             byte[] statusB = status.getBytes();
+            rframe.headerName(SpdyFrame.STATUS, 0, SpdyFrame.STATUS.length);
             rframe.headerValue(statusB, 0, statusB.length);
         }
-        rframe.headerName(VERSION, 0, VERSION.length);
-        rframe.headerValue(HTTP11, 0, HTTP11.length);
+        rframe.addHeader(SpdyFrame.VERSION, SpdyFrame.HTTP11);
 
-        spdy.sendFrameBlocking(rframe, spdyStream);
+        rframe.streamId = spdyStream.reqFrame.streamId;
+        spdy.send(rframe, spdyStream);
         // we can't reuse the frame - it'll be queued, the coyote processor
         // may be reused as well.
         outCommit = true;

Modified: tomcat/trunk/java/org/apache/tomcat/spdy/SpdyConnection.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/spdy/SpdyConnection.java?rev=1299981&r1=1299980&r2=1299981&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/spdy/SpdyConnection.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/spdy/SpdyConnection.java Tue Mar 13 05:37:14 2012
@@ -17,8 +17,11 @@
 package org.apache.tomcat.spdy;
 
 import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
@@ -124,10 +127,40 @@ public abstract class SpdyConnection { /
 
     private Condition outCondition;
 
+    public static final int LONG = 1;
+
+    public static final int CLOSE = -1;
+
+    private SpdyFrame nextFrame;
+
+    /**
+     * Handles the out queue for blocking sockets.
+     */
+    SpdyFrame out;
+
+    boolean draining = false;
+
+    private int goAway = Integer.MAX_VALUE;
+
     public SpdyConnection(SpdyContext spdyContext) {
-        this.setSpdyContext(spdyContext);
+        this.spdyContext = spdyContext;
         outCondition = framerLock.newCondition();
     }
+    
+    public String toString() {
+        return "SpdyCon open=" + channels.size();
+    }
+    
+    public void dump(PrintWriter out) {
+        out.println("SpdyConnection open=" + channels.size() + 
+                " outQ:" + outQueue.size());
+        for (SpdyStream str: channels.values()) {
+            str.dump(out);
+        }
+        
+        out.println();
+        
+    }
 
     /**
      * Write.
@@ -140,6 +173,8 @@ public abstract class SpdyConnection { /
      */
     public abstract int read(byte[] data, int off, int len) throws IOException;
 
+    public abstract void close() throws IOException;
+
     public void setCompressSupport(CompressSupport cs) {
         compressSupport = cs;
     }
@@ -151,7 +186,7 @@ public abstract class SpdyConnection { /
         return frame;
     }
 
-    public SpdyFrame getDataFrame() throws IOException {
+    public SpdyFrame getDataFrame() {
         SpdyFrame frame = getSpdyContext().getFrame();
         return frame;
     }
@@ -167,17 +202,24 @@ public abstract class SpdyConnection { /
      * - for fully non-blocking write: there will be a drain callback.
      */
 
-    /**
-     * Handles the out queue for blocking sockets.
-     */
-    SpdyFrame out;
-
-    boolean draining = false;
+    public void drain() {
+        synchronized (nbDrain) {
+            if (draining) {
+                return;
+            }
+            draining = true;
+        }
 
+        _drain();
+        synchronized (nbDrain) {
+            draining = false;
+        }
+    }
+    
     /**
      * Non blocking if the socket is not blocking.
      */
-    private boolean drain() {
+    private boolean _drain() {
         while (true) {
             framerLock.lock();
 
@@ -190,9 +232,18 @@ public abstract class SpdyConnection { /
                     if (out == null) {
                         return false;
                     }
+                    if (goAway < out.streamId) {
+                        
+                    }
                     SpdyFrame oframe = out;
                     try {
-                        if (oframe.type == TYPE_SYN_STREAM) {
+                        if (!oframe.c) {
+                            // late: IDs are assigned as we send ( priorities may affect
+                            // the transmission order )
+                            if (oframe.stream != null) {
+                                oframe.streamId = oframe.stream.getRequest().streamId;
+                            }
+                        } else if (oframe.type == TYPE_SYN_STREAM) {
                             oframe.fixNV(18);
                             if (compressSupport != null) {
                                 compressSupport.compress(oframe, 18);
@@ -211,7 +262,9 @@ public abstract class SpdyConnection { /
                     if (oframe.type == TYPE_SYN_STREAM) {
                         oframe.streamId = outStreamId;
                         outStreamId += 2;
-                        channels.put(oframe.streamId, oframe.stream);
+                        synchronized(channels) {
+                            channels.put(oframe.streamId, oframe.stream);
+                        }
                     }
 
                     oframe.serializeHead();
@@ -231,15 +284,20 @@ public abstract class SpdyConnection { /
 
             try {
                 int toWrite = out.endData - out.off;
-                int wr = write(out.data, out.off, toWrite);
-                if (wr < 0) {
-                    return false;
-                }
-                if (wr < toWrite) {
-                    out.off += wr;
-                    return true; // non blocking connection
-                }
-                out.off += wr;
+                int wr;
+                while (toWrite > 0) {
+                    wr = write(out.data, out.off, toWrite);
+                    if (wr < 0) {
+                        return false;
+                    }
+                    if (wr == 0) {
+                        return true; // non blocking or to
+                    }
+                    if (wr <= toWrite) {
+                        out.off += wr;
+                        toWrite -= wr;
+                    }
+                } 
                 // Frame was sent
                 framerLock.lock();
                 try {
@@ -247,6 +305,13 @@ public abstract class SpdyConnection { /
                 } finally {
                     framerLock.unlock();
                 }
+                
+                synchronized (channels) {
+                    if (out.stream != null && 
+                            out.stream.finRcvd && out.stream.finSent) {
+                        channels.remove(out.streamId);
+                    }
+                }
                 out = null;
             } catch (IOException e) {
                 // connection closed - abort all streams
@@ -258,31 +323,6 @@ public abstract class SpdyConnection { /
     }
 
     /**
-     * Blocking call for sendFrame: must be called from a thread pool.
-     *
-     * Will wait until the actual frame is sent.
-     */
-    public void sendFrameBlocking(SpdyFrame oframe, SpdyStream proc)
-            throws IOException {
-        queueFrame(oframe, proc, oframe.pri == 0 ? outQueue : prioriyQueue);
-
-        nonBlockingDrain();
-
-        while (!inClosed) {
-            framerLock.lock();
-            try {
-                if (oframe.off == oframe.endData) {
-                    return; // was sent
-                }
-                outCondition.await();
-            } catch (InterruptedException e) {
-            } finally {
-                framerLock.unlock();
-            }
-        }
-    }
-
-    /**
      * Send as much as possible without blocking.
      *
      * With a nb transport it should call drain directly.
@@ -296,23 +336,11 @@ public abstract class SpdyConnection { /
 
     Runnable nbDrain = new Runnable() {
         public void run() {
-            int i = drainCnt++;
-            long t0 = System.currentTimeMillis();
-            synchronized (nbDrain) {
-                if (draining) {
-                    return;
-                }
-                draining = true;
-            }
-
             drain();
-            synchronized (nbDrain) {
-                draining = false;
-            }
         }
     };
 
-    public void sendFrameNonBlocking(SpdyFrame oframe, SpdyStream proc)
+    public void send(SpdyFrame oframe, SpdyStream proc)
             throws IOException {
         queueFrame(oframe, proc, oframe.pri == 0 ? outQueue : prioriyQueue);
         nonBlockingDrain();
@@ -326,7 +354,7 @@ public abstract class SpdyConnection { /
         // We can't assing a stream ID until it is sent - priorities
         // we can't compress either - it's stateful.
         oframe.stream = proc;
-
+        
         framerLock.lock();
         try {
             outQueue.add(oframe);
@@ -370,12 +398,6 @@ public abstract class SpdyConnection { /
         }
     }
 
-    public static final int LONG = 1;
-
-    public static final int CLOSE = -1;
-
-    private SpdyFrame nextFrame;
-
     /**
      * Non-blocking method, read as much as possible and return.
      */
@@ -389,61 +411,66 @@ public abstract class SpdyConnection { /
                 inFrame.data = new byte[16 * 1024];
             }
             // we might already have data from previous frame
-            if (inFrame.endData < 8 || // we don't have the header
-                    inFrame.endData < inFrame.endFrame) { // size != 0 - we
-                                                          // parsed the header
-
-                int rd = read(inFrame.data, inFrame.endData,
-                        inFrame.data.length - inFrame.endData);
-                if (rd < 0) {
-                    abort("Closed");
+            if (inFrame.endReadData < 8 || // we don't have the header
+                    inFrame.endReadData < inFrame.endData) {
+
+                int rd = read(inFrame.data, inFrame.endReadData,
+                        inFrame.data.length - inFrame.endReadData);
+                if (rd == -1) {
+                    if (channels.size() == 0) {
+                        return CLOSE;
+                    } else {
+                        abort("Closed");
+                    }
+                } else if (rd < 0) {
+                    abort("Closed - read error");
                     return CLOSE;
-                }
-                if (rd == 0) {
+                } else if (rd == 0) {
                     return LONG;
                     // Non-blocking channel - will resume reading at off
                 }
-                inFrame.endData += rd;
+                inFrame.endReadData += rd;
             }
-            if (inFrame.endData < 8) {
+            if (inFrame.endReadData < 8) {
                 continue; // keep reading
             }
-            // We got the frame head
-            if (inFrame.endFrame == 0) {
+            if (inFrame.endData == 0) {
                 inFrame.parse();
                 if (inFrame.version != 2) {
                     abort("Wrong version");
                     return CLOSE;
                 }
 
-                // MAx_FRAME_SIZE
-                if (inFrame.endFrame < 0 || inFrame.endFrame > 32000) {
-                    abort("Framing error, size = " + inFrame.endFrame);
+                // MAX_FRAME_SIZE
+                if (inFrame.endData < 0 || inFrame.endData > 32000) {
+                    abort("Framing error, size = " + inFrame.endData);
                     return CLOSE;
                 }
 
-                // grow the buffer if needed. no need to copy the head, parsed
-                // ( maybe for debugging ).
-                if (inFrame.data.length < inFrame.endFrame) {
-                    inFrame.data = new byte[inFrame.endFrame];
+                // TODO: if data, split it in 2 frames
+                // grow the buffer if needed. 
+                if (inFrame.data.length < inFrame.endData) {
+                    byte[] tmp = new byte[inFrame.endData];
+                    System.arraycopy(inFrame.data, 0, tmp, 0, inFrame.endReadData);
+                    inFrame.data = tmp;
                 }
             }
 
-            if (inFrame.endData < inFrame.endFrame) {
+            if (inFrame.endReadData < inFrame.endData) {
                 continue; // keep reading to fill current frame
             }
             // else: we have at least the current frame
-            int extra = inFrame.endData - inFrame.endFrame;
+            int extra = inFrame.endReadData - inFrame.endData;
             if (extra > 0) {
                 // and a bit more - to keep things simple for now we
                 // copy them to next frame, at least we saved reads.
                 // it is possible to avoid copy - but later.
                 nextFrame = getSpdyContext().getFrame();
                 nextFrame.makeSpace(extra);
-                System.arraycopy(inFrame.data, inFrame.endFrame,
+                System.arraycopy(inFrame.data, inFrame.endData,
                         nextFrame.data, 0, extra);
-                nextFrame.endData = extra;
-                inFrame.endData = inFrame.endFrame;
+                nextFrame.endReadData = extra;
+                inFrame.endReadData = inFrame.endData;
             }
 
             // decompress
@@ -503,17 +530,38 @@ public abstract class SpdyConnection { /
     public void abort(String msg) {
         System.err.println(msg);
         inClosed = true;
-        // TODO: close all streams
 
+        List<Integer> ch = new ArrayList<Integer>(channels.keySet());
+        for (Integer i: ch) {
+            SpdyStream stream = channels.remove(i);
+            if (stream != null) {
+                stream.onReset();
+            }
+        }
+    }
+
+    public void abort(String msg, int last) {
+        System.err.println(msg);
+        inClosed = true;
+
+        List<Integer> ch = new ArrayList<Integer>(channels.keySet());
+        for (Integer i: ch) {
+            if (i > last) {
+                SpdyStream stream = channels.remove(i);
+                if (stream != null) {
+                    stream.onReset();
+                }
+            }
+        }
     }
 
     /**
      * Process a SPDY connection. Called in a separate thread.
-     *
+     * 
      * @return
      * @throws IOException
      */
-    public int handleFrame() throws IOException {
+    protected int handleFrame() throws IOException {
         if (inFrame.c) {
             switch (inFrame.type) {
             case TYPE_SETTINGS: {
@@ -529,7 +577,11 @@ public abstract class SpdyConnection { /
             case TYPE_GOAWAY: {
                 int lastStream = inFrame.readInt();
                 log.info("GOAWAY last=" + lastStream);
-                abort("GOAWAY");
+                
+                // Server will shut down - but will keep processing the current requests,
+                // up to lastStream. If we sent any new ones - they need to be canceled.
+                abort("GO_AWAY", lastStream);
+                goAway  = lastStream;
                 return CLOSE;
             }
             case TYPE_RST_STREAM: {
@@ -540,12 +592,19 @@ public abstract class SpdyConnection { /
                         + " "
                         + ((errCode < RST_ERRORS.length) ? RST_ERRORS[errCode]
                                 : errCode));
-                SpdyStream sch = channels.get(inFrame.streamId);
+                SpdyStream sch;
+                synchronized(channels) {
+                        sch = channels.get(inFrame.streamId);
+                }
                 if (sch == null) {
                     abort("Missing channel " + inFrame.streamId);
                     return CLOSE;
                 }
                 sch.onCtlFrame(inFrame);
+                
+                synchronized(channels) {
+                    channels.remove(inFrame.streamId);
+                }
                 inFrame = null;
                 break;
             }
@@ -569,7 +628,10 @@ public abstract class SpdyConnection { /
                 break;
             }
             case TYPE_SYN_REPLY: {
-                SpdyStream sch = channels.get(inFrame.streamId);
+                SpdyStream sch;
+                synchronized(channels) {
+                    sch = channels.get(inFrame.streamId);
+                }
                 if (sch == null) {
                     abort("Missing channel");
                     return CLOSE;
@@ -593,18 +655,26 @@ public abstract class SpdyConnection { /
                 oframe.append32(inFrame.read32());
                 oframe.pri = 0x80;
 
-                sendFrameNonBlocking(oframe, null);
+                send(oframe, null);
                 break;
             }
             }
         } else {
             // Data frame
-            SpdyStream sch = channels.get(inFrame.streamId);
+            SpdyStream sch;
+            synchronized (channels) {
+                sch = channels.get(inFrame.streamId);                
+            }
             if (sch == null) {
                 abort("Missing channel");
                 return CLOSE;
             }
             sch.onDataFrame(inFrame);
+            synchronized (channels) {
+                if (sch.finRcvd && sch.finSent) {
+                    channels.remove(inFrame.streamId);
+                }
+            }
             inFrame = null;
         }
         return LONG;
@@ -614,14 +684,10 @@ public abstract class SpdyConnection { /
         return spdyContext;
     }
 
-    public void setSpdyContext(SpdyContext spdyContext) {
-        this.spdyContext = spdyContext;
-    }
-
     public SpdyStream get(String host, String url) throws IOException {
         SpdyStream sch = new SpdyStream(this);
-        sch.addHeader("host", host);
-        sch.addHeader("url", url);
+        sch.getRequest().addHeader("host", host);
+        sch.getRequest().addHeader("url", url);
 
         sch.send();
 

Modified: tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContext.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContext.java?rev=1299981&r1=1299980&r2=1299981&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContext.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContext.java Tue Mar 13 05:37:14 2012
@@ -47,9 +47,12 @@ public class SpdyContext {
 
     private Executor executor;
 
-    int defaultFrameSize = 8196;
+    int defaultFrameSize = 8192;
 
-    public static boolean debug = true;
+    public static boolean debug = false;
+
+    boolean tls = true;
+    boolean compression = true;
 
     /**
      * Get a frame - frames are heavy buffers, may be reused.
@@ -62,6 +65,19 @@ public class SpdyContext {
     }
 
     /**
+     * Set the max frame size.
+     * 
+     * Larger data packets will be split in multiple frames.
+     * 
+     * ( the code is currently accepting larger control frames - it's not 
+     * clear if we should just reject them, many servers limit header size -
+     * the http connector also has a 8k limit - getMaxHttpHeaderSize )
+     */
+    public void setFrameSize(int frameSize) { 
+        defaultFrameSize = frameSize;
+    }
+    
+    /** 
      * Override for server side to return a custom stream.
      */
     public SpdyStream getStream(SpdyConnection framer) {
@@ -104,4 +120,14 @@ public class SpdyContext {
 
     public void releaseConnection(SpdyConnection con) {
     }
+    
+    public void listen(final int port, String cert, String key) throws IOException {
+        throw new IOException("Not implemented");
+    }    
+    
+    /**
+     * Close all pending connections and free resources.
+     */
+    public void stop() throws IOException {
+    }
 }

Added: tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContextJni.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContextJni.java?rev=1299981&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContextJni.java (added)
+++ tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContextJni.java Tue Mar 13 05:37:14 2012
@@ -0,0 +1,182 @@
+/*
+ */
+package org.apache.tomcat.spdy;
+
+import java.io.IOException;
+
+import org.apache.tomcat.jni.Status;
+import org.apache.tomcat.jni.socket.AprSocket;
+import org.apache.tomcat.jni.socket.AprSocketContext;
+import org.apache.tomcat.jni.socket.AprSocketContext.NonBlockingPollHandler;
+import org.apache.tomcat.jni.socket.AprSocketContext.TlsCertVerifier;
+
+public class SpdyContextJni extends SpdyContext {
+    AprSocketContext con;
+    
+    //AprSocketContext socketCtx;
+    
+    public SpdyContextJni() {
+        con = new AprSocketContext();
+        //if (insecureCerts) {
+        con.customVerification(new TlsCertVerifier() {
+            @Override
+            public void handshakeDone(AprSocket ch) {
+            }
+        });
+        //}
+        con.setNpn("spdy/2");
+    }
+    
+    @Override
+    public SpdyConnection getConnection(String host, int port) throws IOException {
+        SpdyConnectionAprSocket spdy = new SpdyConnectionAprSocket(this);
+        
+        AprSocket ch = con.socket(host, port, tls);
+
+        spdy.setSocket(ch);
+
+        ch.connect();
+
+        ch.setHandler(new SpdySocketHandler(spdy));
+        
+        // need to consume the input to receive more read events
+        int rc = spdy.processInput();
+        if (rc == SpdyConnection.CLOSE) {
+            ch.close();
+            throw new IOException("Error connecting");
+        }
+
+        return spdy;
+    }
+    
+    public void onAccept(long socket) throws IOException {
+        SpdyConnectionAprSocket spdy = new SpdyConnectionAprSocket(SpdyContextJni.this);
+        AprSocket s = con.socket(socket);
+        spdy.setSocket(s);
+        
+        SpdySocketHandler handler = new SpdySocketHandler(spdy);
+        s.setHandler(handler);    
+        handler.process(s, true, true, false);
+    }
+    
+    public void listen(final int port, String cert, String key) throws IOException {
+        con = new AprSocketContext() {
+            protected void onSocket(AprSocket s) throws IOException {
+                SpdyConnectionAprSocket spdy = new SpdyConnectionAprSocket(SpdyContextJni.this);
+                spdy.setSocket(s);
+                
+                SpdySocketHandler handler = new SpdySocketHandler(spdy);
+                s.setHandler(handler);
+            }
+        };
+        
+        con.setNpn(SpdyContext.SPDY_NPN_OUT);
+        con.setKeys(cert, key);
+        
+        con.listen(port);
+    }
+
+    public void stop() throws IOException {
+        con.stop();
+    }
+    
+    public AprSocketContext getAprContext() {
+        return con;
+    } 
+    
+    // NB
+    class SpdySocketHandler implements NonBlockingPollHandler {
+        SpdyConnection con;
+        
+        SpdySocketHandler(SpdyConnection con) {
+            this.con = con;
+        }
+        
+        @Override
+        public void closed(AprSocket ch) {
+            // not used ( polling not implemented yet )
+        }
+
+        @Override
+        public void process(AprSocket ch, boolean in, boolean out, boolean close) {
+            try {
+                int rc = con.processInput();
+                if (rc == SpdyConnection.CLOSE) {
+                    ch.close();
+                }
+                con.drain();
+            } catch (IOException e) {
+                // TODO Auto-generated catch block
+                e.printStackTrace();
+                ch.reset();
+            }
+        }
+
+        @Override
+        public void connected(AprSocket ch) {
+        }
+
+        @Override
+        public void error(AprSocket ch, Throwable t) {
+        }
+        
+    }
+    
+    public static class SpdyConnectionAprSocket extends SpdyConnection {
+        AprSocket socket;
+
+        public SpdyConnectionAprSocket(SpdyContext spdyContext) {
+            super(spdyContext);
+            //setCompressSupport(new CompressJzlib());
+            if (spdyContext.compression) {
+                setCompressSupport(new CompressDeflater6());
+            }
+        }
+
+        public void setSocket(AprSocket ch) {
+            this.socket = ch;
+        }
+
+        @Override
+        public void close() throws IOException {
+            socket.close();
+        }
+        
+        @Override
+        public int write(byte[] data, int off, int len) throws IOException {
+            if (socket == null) {
+                return -1;
+            }
+            int sent = socket.write(data, off, len);
+            if (sent < 0) {
+                return -1;
+            }
+            return sent;
+        }
+
+        /**
+         * @throws IOException
+         */
+        @Override
+        public int read(byte[] data, int off, int len) throws IOException {
+            if (socket == null) {
+                return -1;
+            }
+            int rd = socket.read(data, off, len);
+            // org.apache.tomcat.jni.Socket.recv(socket, data, off, len);
+            if (rd == -Status.APR_EOF) {
+                return -1;
+            }
+            if (rd == -Status.TIMEUP || rd == -Status.EINTR || rd == -Status.EAGAIN) {
+                rd = 0;
+            }
+            if (rd < 0) {
+                return -1;
+            }
+            off += rd;
+            len -= rd;
+            return rd;
+        }
+    }
+
+}

Modified: tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContextProxy.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContextProxy.java?rev=1299981&r1=1299980&r2=1299981&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContextProxy.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContextProxy.java Tue Mar 13 05:37:14 2012
@@ -17,8 +17,10 @@
 package org.apache.tomcat.spdy;
 
 import java.io.IOException;
+import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketTimeoutException;
+import java.util.concurrent.Semaphore;
 
 /**
  * Spdy context for 'proxy' or test mode spdy - no NPN, no SSL, no compression.
@@ -78,6 +80,11 @@ public class SpdyContextProxy extends Sp
         }
 
         @Override
+        public void close() throws IOException {
+            socket.close();
+        }
+
+        @Override
         public synchronized int write(byte[] data, int off, int len) throws IOException {
             socket.getOutputStream().write(data, off, len);
             return len;
@@ -92,4 +99,52 @@ public class SpdyContextProxy extends Sp
             }
         }
     }
+    
+    
+    boolean running = true;
+    ServerSocket serverSocket;
+    
+    public void stop() throws IOException {
+        running = false;
+        serverSocket.close();
+    }
+    
+    /**
+     *  For small servers/testing: run in server mode.
+     *  Need to override onSynStream() to implement the logic.
+     */
+    public void listen(final int port, String cert, String key) throws IOException {
+        getExecutor().execute(new Runnable() {
+            @Override
+            public void run() {
+                accept(port);
+            }
+        });
+    }
+
+    private void accept(int port) {
+        try {
+            serverSocket = new ServerSocket(port);
+            while (running) {
+                final Socket socket = serverSocket.accept();
+                final SpdyConnection con = getConnection(socket);
+                getExecutor().execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        con.onBlockingSocket();
+                        try {
+                            socket.close();
+                        } catch (IOException e) {
+                            e.printStackTrace();
+                        }
+                    }
+                });
+            }
+        } catch (IOException ex) {
+            if (running) {
+                ex.printStackTrace();
+            }
+            running = false;
+        }
+    }
 }

Modified: tomcat/trunk/java/org/apache/tomcat/spdy/SpdyFrame.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/spdy/SpdyFrame.java?rev=1299981&r1=1299980&r2=1299981&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/spdy/SpdyFrame.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/spdy/SpdyFrame.java Tue Mar 13 05:37:14 2012
@@ -16,7 +16,18 @@
  */
 package org.apache.tomcat.spdy;
 
+import java.util.Map;
+
 public class SpdyFrame {
+    public static byte[] STATUS = "status".getBytes();
+
+    public static byte[] VERSION = "version".getBytes();
+
+    public static byte[] HTTP11 = "HTTP/1.1".getBytes();
+
+    public static byte[] OK200 = "200 OK".getBytes();
+    
+    
     // This is a bit more complicated, to avoid multiple reads/writes.
     // We'll read as much as possible - possible past frame end. This may
     // cost an extra copy - or even more complexity for dealing with slices
@@ -25,18 +36,21 @@ public class SpdyFrame {
 
     public int off = 8; // used when reading - current offset
 
-    public int endFrame; // end of frame == size + 8
+    int endReadData; // how much has been read ( may be more or less than a frame )
 
     // On write it is incremented.
 
-    public int endData; // end of data in the buffer (may be past frame end)
-
+    /**
+     *  end of data in the buffer.
+     */
+    public int endData; 
+    
     // Processed data from the frame
     boolean c; // for control
 
     int version;
 
-    private int flags;
+    int flags;
 
     public int type;
 
@@ -62,7 +76,7 @@ public class SpdyFrame {
     public void recyle() {
         type = 0;
         c = false;
-        endFrame = 0;
+        endReadData = 0;
         off = 8;
         streamId = 0;
         nvCount = 0;
@@ -76,10 +90,10 @@ public class SpdyFrame {
             }
             return "C" + " S=" + streamId + (flags != 0 ? " F=" + flags : "")
                     + (version != 2 ? "  v" + version : "") + " t=" + type
-                    + " L=" + endFrame + "/" + off;
+                    + " L=" + endData + "/" + off;
         } else {
             return "D" + " S=" + streamId + (flags != 0 ? " F=" + flags : "")
-                    + " L=" + endFrame + "/" + off;
+                    + " L=" + endData + "/" + off;
         }
     }
 
@@ -118,7 +132,7 @@ public class SpdyFrame {
     }
 
     public boolean parse() {
-        endFrame = 0;
+        endData = 0;
         streamId = 0;
         nvCount = 0;
 
@@ -142,12 +156,12 @@ public class SpdyFrame {
         flags = data[4] & 0xFF;
         for (int i = 5; i < 8; i++) {
             b0 = data[i] & 0xFF;
-            endFrame = endFrame << 8 | b0;
+            endData = endData << 8 | b0;
         }
 
         // size will represent the end of the data ( header is held in same
         // buffer)
-        endFrame += 8;
+        endData += 8;
 
         return true;
     }
@@ -226,6 +240,37 @@ public class SpdyFrame {
         nvCount++;
         headerValue(buf, soff, len);
     }
+    
+    public void addHeader(String name, String value) {
+        byte[] nameB = name.getBytes();
+        headerName(nameB, 0, nameB.length);
+        nameB = value.getBytes();
+        headerValue(nameB, 0, nameB.length);
+    }
+
+    public void addHeader(byte[] nameB, String value) {
+        headerName(nameB, 0, nameB.length);
+        nameB = value.getBytes();
+        headerValue(nameB, 0, nameB.length);
+    }
+
+    public void addHeader(byte[] nameB, byte[] valueB) {
+        headerName(nameB, 0, nameB.length);
+        headerValue(valueB, 0, valueB.length);
+    }
+    
+    public void getHeaders(Map<String, String> resHeaders) {
+        for (int i = 0; i < nvCount; i++) {
+            int len = read16();
+            String n = new String(data, off, len, SpdyStream.UTF8);
+            advance(len);
+            len = read16();
+            String v = new String(data, off, len, SpdyStream.UTF8);
+            advance(len);
+            resHeaders.put(n, v);
+        }
+    }
+    
 
     // TODO: instead of that, use byte[][]
     void makeSpace(int len) {
@@ -294,11 +339,14 @@ public class SpdyFrame {
     }
 
     public int remaining() {
-        return endFrame - off;
+        return endData - off;
     }
 
     public void advance(int cnt) {
         off += cnt;
     }
 
-}
\ No newline at end of file
+    public boolean isData() {
+        return !c;
+    }
+}

Modified: tomcat/trunk/java/org/apache/tomcat/spdy/SpdyStream.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/spdy/SpdyStream.java?rev=1299981&r1=1299980&r2=1299981&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/spdy/SpdyStream.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/spdy/SpdyStream.java Tue Mar 13 05:37:14 2012
@@ -17,6 +17,7 @@
 package org.apache.tomcat.spdy;
 
 import java.io.IOException;
+import java.io.PrintWriter;
 import java.nio.charset.Charset;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
@@ -43,20 +44,44 @@ public class SpdyStream {
 
     public SpdyFrame reqFrame;
 
-    SpdyFrame resFrame;
+    public SpdyFrame resFrame;
 
-    BlockingQueue<SpdyFrame> inData = new LinkedBlockingQueue<SpdyFrame>();
-
-    public static final SpdyFrame END_FRAME = new SpdyFrame(16);
+    /**
+     * For blocking support.
+     */
+    protected BlockingQueue<SpdyFrame> inData = new LinkedBlockingQueue<SpdyFrame>();
 
-    boolean finSent;
+    protected boolean finSent;
 
     protected boolean finRcvd;
+    
+    /**
+     *  Dummy data frame to insert on reset / go away
+     */
+    static SpdyFrame END_FRAME;
+    
+    static {
+        END_FRAME = new SpdyFrame(16);
+        END_FRAME.endData = 0;
+        END_FRAME.off = 0;
+        END_FRAME.c = false; 
+        END_FRAME.flags =SpdyConnection.FLAG_HALF_CLOSE; 
+    }
 
     public SpdyStream(SpdyConnection spdy) {
         this.spdy = spdy;
     }
 
+    public void dump(PrintWriter out) {
+        if (reqFrame != null) {
+            out.println("Req: " + reqFrame);
+        }
+        if (resFrame != null) {
+            out.println("Res: " + resFrame);
+        }
+        out.println("In: " + inData.size() + (finRcvd ? " FIN":""));
+    }
+
     /**
      * Non-blocking, called when a data frame is received.
      *
@@ -64,10 +89,11 @@ public class SpdyStream {
      * buffer ( to avoid a copy ).
      */
     public void onDataFrame(SpdyFrame inFrame) {
-        inData.add(inFrame);
-        if (inFrame.closed()) {
-            finRcvd = true;
-            inData.add(END_FRAME);
+        synchronized(this) {
+            inData.add(inFrame);
+            if (inFrame.closed()) {
+                finRcvd = true;
+            }
         }
     }
 
@@ -84,29 +110,61 @@ public class SpdyStream {
             reqFrame = frame;
         } else if (frame.type == SpdyConnection.TYPE_SYN_REPLY) {
             resFrame = frame;
+        } else if (frame.type == SpdyConnection.TYPE_RST_STREAM) {
+            onReset();
         }
-        if (frame.isHalfClose()) {
-            finRcvd = true;
+        synchronized (this) {
+            inData.add(frame);
+            if (frame.isHalfClose()) {
+                finRcvd = true;
+            }            
         }
     }
 
+    /** 
+     * Called on GOAWAY or reset.
+     */
+    public void onReset() {
+        finRcvd = true;
+        finSent = true;
+        
+        // To unblock
+        inData.add(END_FRAME);
+    }
+    
     /**
      * True if the channel both received and sent FIN frames.
-     *
+     * 
      * This is tracked by the processor, to avoid extra storage in framer.
      */
     public boolean isFinished() {
         return finSent && finRcvd;
     }
-
-    public SpdyFrame getIn(long to) throws IOException {
+    
+    /**
+     * Waits and return the next data frame.
+     */
+    public SpdyFrame getDataFrame(long to) throws IOException {
+        while (true) {
+            SpdyFrame res = getFrame(to);
+            if (res == null || res.isData()) {
+                return res;
+            }
+        }
+    }
+    
+    /**
+     * Waits and return the next frame. First frame will be the control frame
+     */
+    public SpdyFrame getFrame(long to) throws IOException {
         SpdyFrame in;
         try {
-            if (inData.size() == 0 && finRcvd) {
-                return null;
+            synchronized (this) {
+                if (inData.size() == 0 && finRcvd) {
+                    return null;
+                }                
             }
             in = inData.poll(to, TimeUnit.MILLISECONDS);
-
             if (in == END_FRAME) {
                 return null;
             }
@@ -137,14 +195,14 @@ public class SpdyStream {
         return reqFrame;
     }
 
-    public void addHeader(String name, String value) {
-        byte[] nameB = name.getBytes();
-        getRequest().headerName(nameB, 0, nameB.length);
-        nameB = value.getBytes();
-        reqFrame.headerValue(nameB, 0, nameB.length);
+    public SpdyFrame getResponse() {
+        if (resFrame == null) {
+            resFrame = spdy.getFrame(SpdyConnection.TYPE_SYN_REPLY);
+            resFrame.streamId = reqFrame.streamId;
+        }
+        return resFrame;
     }
 
-
     public synchronized void sendDataFrame(byte[] data, int start,
             int length, boolean close) throws IOException {
 
@@ -159,12 +217,11 @@ public class SpdyStream {
         // 1 tcp packet. That's the current choice, seems closer to rest of
         // tomcat
 
-        oframe.streamId = reqFrame.streamId;
         if (close)
             oframe.halfClose();
 
         oframe.append(data, start, length);
-        spdy.sendFrameBlocking(oframe, this);
+        spdy.send(oframe, this);
     }
 
     public void send() throws IOException {
@@ -172,8 +229,8 @@ public class SpdyStream {
     }
 
     public void send(String host, String url, String scheme, String method) throws IOException {
-        addHeader("host", host);
-        addHeader("url", url);
+        getRequest().addHeader("host", host);
+        getRequest().addHeader("url", url);
 
         send(scheme, method);
     }
@@ -184,13 +241,14 @@ public class SpdyStream {
             // TODO: add the others
             reqFrame.halfClose();
         }
-        addHeader("scheme", "http"); // todo
-        addHeader("method", method);
-        addHeader("version", "HTTP/1.1");
+        getRequest().addHeader("scheme", "http"); // todo
+        getRequest().addHeader("method", method);
+        getRequest().addHeader("version", "HTTP/1.1");
         if (reqFrame.isHalfClose()) {
             finSent = true;
         }
-        spdy.sendFrameBlocking(reqFrame, this);
+        spdy.send(reqFrame, this);
     }
 
-}
\ No newline at end of file
+
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org