You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by co...@apache.org on 2010/01/13 03:07:27 UTC

svn commit: r898619 - in /tomcat/trunk/modules/tomcat-lite: java/org/apache/tomcat/lite/http/ java/org/apache/tomcat/lite/io/ java/org/apache/tomcat/lite/service/ test/org/apache/tomcat/lite/ test/org/apache/tomcat/lite/http/ test/org/apache/tomcat/lit...

Author: costin
Date: Wed Jan 13 02:07:25 2010
New Revision: 898619

URL: http://svn.apache.org/viewvc?rev=898619&view=rev
Log:
Moved the connection pool to a top level class and started to add more code. Still missing is evicting kept-alive connections and 
queueing to limit the number of active requests per host ( and probably more ).
Started to make spdy more like a part of a http request - i.e. upgrade if supported by both ends, etc.  Now load
tests seem to work - no more OOM. Due to compression spdy it's using more memory per connection, current tests don't enable compression
( it's accepted for incoming connections ).  


Modified:
    tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/CompressFilter.java
    tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/Http11Connection.java
    tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpChannel.java
    tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpConnector.java
    tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/SpdyConnection.java
    tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/io/IOBuffer.java
    tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/io/IOChannel.java
    tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/service/IOStatus.java
    tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/TestMain.java
    tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/http/HttpChannelInMemoryTest.java
    tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/http/LiveHttp1Test.java
    tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/http/SpdyTest.java
    tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/load/LiveHttpThreadedTest.java
    tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/test/watchdog/WatchdogClient.java

Modified: tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/CompressFilter.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/CompressFilter.java?rev=898619&r1=898618&r2=898619&view=diff
==============================================================================
--- tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/CompressFilter.java (original)
+++ tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/CompressFilter.java Wed Jan 13 02:07:25 2010
@@ -38,22 +38,34 @@
     long dictId;
     
     public CompressFilter() {
-        cStream = new ZStream();
-        cStream.deflateInit(JZlib.Z_BEST_COMPRESSION);
-        
-        dStream = new ZStream();
-        dStream.inflateInit();
     }
     
     public void recycle() {
+        if (cStream == null) {
+            return;
+        }
+        cStream.free();
+        cStream = null;
+        dStream.free();
+        dStream = null;
+    }
+    
+    public void init() {
+        if (cStream != null) {
+            return;
+        }
         // can't call: cStream.free(); - will kill the adler, NPE
-
-        cStream.deflateInit(JZlib.Z_BEST_COMPRESSION);
+        cStream = new ZStream();
+        // BEST_COMRESSION results in 256Kb per Deflate
+        cStream.deflateInit(JZlib.Z_BEST_SPEED);
+        
+        dStream = new ZStream();
         dStream.inflateInit();
 
     }
     
     CompressFilter setDictionary(byte[] dict, long id) {
+        init();
         this.dict = dict;
         this.dictId = id;
         cStream.deflateSetDictionary(dict, dict.length);
@@ -61,6 +73,7 @@
     }
 
     void compress(IOBuffer in, IOBuffer out) throws IOException {
+        init();
         BBucket bb = in.popFirst();
         
         while (bb != null) {
@@ -80,6 +93,7 @@
         // TODO: only the last one needs flush
 
         // TODO: size missmatches ?
+        init();
         int flush = JZlib.Z_PARTIAL_FLUSH;
 
         cStream.next_in = bb.array();
@@ -129,6 +143,7 @@
     }
     
     void decompress(IOBuffer in, IOBuffer out, int len) throws IOException {
+        init();
         BBucket bb = in.peekFirst();
         
         while (bb != null && len > 0) {

Modified: tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/Http11Connection.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/Http11Connection.java?rev=898619&r1=898618&r2=898619&view=diff
==============================================================================
--- tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/Http11Connection.java (original)
+++ tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/Http11Connection.java Wed Jan 13 02:07:25 2010
@@ -12,12 +12,16 @@
 import org.apache.tomcat.lite.io.BBucket;
 import org.apache.tomcat.lite.io.BBuffer;
 import org.apache.tomcat.lite.io.CBuffer;
+import org.apache.tomcat.lite.io.IOConnector;
+import org.apache.tomcat.lite.io.DumpChannel;
 import org.apache.tomcat.lite.io.FastHttpDateFormat;
 import org.apache.tomcat.lite.io.Hex;
 import org.apache.tomcat.lite.io.IOBuffer;
 import org.apache.tomcat.lite.io.IOChannel;
+import org.apache.tomcat.lite.io.SslChannel;
 
-public class Http11Connection extends HttpConnection {
+public class Http11Connection extends HttpConnection 
+        implements IOConnector.ConnectedCallback {
     public static final String CHUNKED = "chunked";
     
     public static final String CLOSE = "close"; 
@@ -33,8 +37,7 @@
     static final byte COLON = (byte) ':';
 
     // super.net is the socket 
-
-    HttpChannel activeHttp;
+    
     boolean debug;
     BBuffer line = BBuffer.wrapper(); 
     boolean endSent = false;
@@ -74,7 +77,11 @@
     }
 
     public void beforeRequest() {
-        activeHttp = null;
+        nextRequest();
+        headRecvBuf.recycle();
+    }
+
+    public void nextRequest() {
         endSent = false;
         keepAlive = true;
         receiveBodyState.recycle();
@@ -84,7 +91,6 @@
         http10 = false;
         headersReceived = false;
         bodyReceived = false;
-        headRecvBuf.recycle();
     }
     
     public Http11Connection serverMode() {
@@ -108,7 +114,8 @@
                 if (headRecvBuf.get(0) == 0x80 && 
                         headRecvBuf.get(1) == 0x01) {
                     // SPDY signature ( experimental )
-                    switchedProtocol = new SpdyConnection(httpConnector);
+                    switchedProtocol = new SpdyConnection(httpConnector, 
+                            remoteHost);
                     if (serverMode) {
                         switchedProtocol.serverMode = true;
                     }
@@ -167,9 +174,9 @@
     }
 
     @Override
-    public void handleReceived(IOChannel netx) throws IOException {
+    public void dataReceived(IOBuffer netx) throws IOException {
         if (switchedProtocol != null) {
-            switchedProtocol.handleReceived(netx);
+            switchedProtocol.dataReceived(netx);
             return;
         }
         //trace("handleReceived " + headersReceived);
@@ -299,7 +306,7 @@
         if (http09) {
             keepAlive = false;
         }
-        if (!net.isOpen()) {
+        if (net != null && !net.isOpen()) {
             keepAlive = false;
         }
         return keepAlive;
@@ -311,8 +318,8 @@
             switchedProtocol.endSendReceive(http);
             return;
         }
-        activeHttp = null; 
-        if (!keepAlive()) {
+        boolean keepAlive = keepAlive();
+        if (!keepAlive) {
             if (debug) {
                 log.info("--- Close socket, no keepalive " + net);
             }
@@ -321,27 +328,16 @@
                 net.startSending();
 
             }
-            beforeRequest(); 
-            return;
         }
         synchronized (readLock) {
-            beforeRequest(); // will clear head buffer
             requestCount++;
-            if (serverMode) {
-                if (debug) {
-                    log.info(">>> server socket KEEP_ALIVE " + net.getTarget() + 
-                            " " + net + " " + net.getIn().available());
-                }
+            beforeRequest();
+            httpConnector.cpool.afterRequest(http, this, true);
+
+            if (serverMode && keepAlive) {
                 handleReceived(net); // will attempt to read next req
-            } else {
-                if (debug) {
-                    log.info(">>> client socket KEEP_ALIVE " + net.getTarget() + 
-                            " " + net);
-                }
-                httpConnector.cpool.returnChannel(this);
             }
         }
-
     }
     
     private void trace(String s) {
@@ -741,8 +737,6 @@
             return;
         }
 
-        this.activeHttp = http;
-        
         // Update transfer fields based on headers.
         processProtocol(http.getRequest().protocol());
         updateKeepAlive(http.getRequest().getMimeHeaders(), true);
@@ -1402,5 +1396,33 @@
         (bodyReceived ? " BODY " : "")
         ;  
     }
-    
+
+    @Override
+    public void handleConnected(IOChannel net) throws IOException {
+
+        HttpChannel httpCh = activeHttp; 
+        boolean ssl = httpCh.getRequest().isSecure();
+        if (ssl) {
+            SslChannel ch1 = new SslChannel();
+            ch1.setSslContext(httpConnector.sslConnector.getSSLContext());
+            ch1.setSink(net);
+            net.addFilterAfter(ch1);
+            net = ch1;
+        }
+        if (httpConnector.debugHttp) {
+            IOChannel ch1 = new DumpChannel("");
+            net.addFilterAfter(ch1);
+            net = ch1;                        
+        }
+
+        if (!net.isOpen()) {
+            httpCh.abort("Can't connect");
+            return;
+        }
+        
+        setSink(net);
+        
+        sendRequest(httpCh);
+    }
+
 }

Modified: tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpChannel.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpChannel.java?rev=898619&r1=898618&r2=898619&view=diff
==============================================================================
--- tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpChannel.java (original)
+++ tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpChannel.java Wed Jan 13 02:07:25 2010
@@ -316,7 +316,7 @@
         if (target == null) {
             return ":0"; // server mode ? 
         }
-        return target;
+        return target.toString();
     }
 
 
@@ -797,11 +797,25 @@
     }
     
     
-    
+    /**
+     * This method will be called when the http headers have been received - 
+     * the body may or may not be available. 
+     * 
+     * In server mode this is equivalent with a servlet request. 
+     * This is also called for http client, when the response headers
+     * are received. 
+     *  
+     * TODO: rename it to HttMessageReceived or something similar.
+     */
     public static interface HttpService {
         void service(HttpRequest httpReq, HttpResponse httpRes) throws IOException;
     }
     
+    /** 
+     * Called when both request and response bodies have been sent/
+     * received. After this call the HttpChannel will be disconnected
+     * from the http connection, which can be used for other requests. 
+     */
     public static interface RequestCompleted {
         void handle(HttpChannel data, Object extraData) throws IOException;
     }
@@ -814,11 +828,4 @@
     };
     
 
-    IOConnector.ConnectedCallback connectedCallback = new IOConnector.ConnectedCallback() {
-        @Override
-        public void handleConnected(IOChannel ch) throws IOException {
-            httpConnector.handleConnected(ch, HttpChannel.this);
-        }
-    };
-    
 }
\ No newline at end of file

Modified: tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpConnector.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpConnector.java?rev=898619&r1=898618&r2=898619&view=diff
==============================================================================
--- tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpConnector.java (original)
+++ tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpConnector.java Wed Jan 13 02:07:25 2010
@@ -44,8 +44,6 @@
         public void onDestroy(HttpChannel ch, HttpConnector con) throws IOException;
     }
     
-    HttpConnectionManager conManager = new HttpConnectionManager();
-    
     private static Logger log = Logger.getLogger("HttpConnector");
     
     /** 
@@ -84,7 +82,7 @@
     public AtomicInteger recycledChannels = new AtomicInteger();
     public AtomicInteger reusedChannels = new AtomicInteger();
 
-    public ConnectionPool cpool = new ConnectionPool();
+    public HttpConnectionPool cpool = new HttpConnectionPool(this);
         
     // Host + context mapper.
     Dispatcher dispatcher;
@@ -332,11 +330,6 @@
 
     }
     
-    HttpConnection newConnection() {
-        return conManager.newConnection(this);
-    }
-
-
     private class AcceptorCallback implements IOConnector.ConnectedCallback {
         @Override
         public void handleConnected(IOChannel accepted) throws IOException {
@@ -346,7 +339,7 @@
 
     public HttpConnection handleAccepted(IOChannel accepted) throws IOException {
         // TODO: reuse
-        HttpConnection shttp = newConnection();
+        HttpConnection shttp = cpool.accepted(accepted);
         shttp.serverMode = true;
 
         if (debugHttp) {
@@ -371,50 +364,11 @@
         return this;
     }
     
-    public void handleConnected(IOChannel net, HttpChannel httpCh) 
-            throws IOException {
-        boolean ssl = httpCh.getRequest().isSecure();
-        if (ssl) {
-            SslChannel ch1 = new SslChannel();
-            ch1.setSslContext(sslConnector.getSSLContext());
-            ch1.setSink(net);
-            net.addFilterAfter(ch1);
-            net = ch1;
-        }
-        if (debugHttp) {
-            IOChannel ch1 = new DumpChannel("");
-            net.addFilterAfter(ch1);
-            net = ch1;                        
-        }
-        
-        if (!net.isOpen()) {
-            httpCh.abort("Can't connect");
-            return;
-        }
-        HttpConnection httpStream = newConnection();
-        httpStream.setSink(net);
-
-        // TODO: add it to the cpool
-        httpCh.setConnection(httpStream);
-        
-        httpStream.sendRequest(httpCh);
-    }
-
-    public static class HttpConnectionManager {
-        public HttpConnection newConnection(HttpConnector con) {
-            return new Http11Connection(con);
-        }
-        
-        public HttpConnection getFromPool(RemoteServer t) {
-            return t.connections.remove(t.connections.size() - 1);            
-        }
-    }
-    
     /**
      * Actual HTTP/1.1 wire protocol. 
      *  
      */
-    public static class HttpConnection extends IOChannel
+    public static abstract class HttpConnection extends IOChannel
         implements DataReceivedCallback
     {
         protected HttpConnector httpConnector;
@@ -422,9 +376,18 @@
 
         protected BBuffer headRecvBuf = BBuffer.allocate(8192);
         protected CompressFilter compress = new CompressFilter();
+        
+        protected boolean secure = false;
+        
+        protected HttpConnectionPool.RemoteServer remoteHost;
+        // If set, the connection is in use ( active )
+        // null == keep alive. Changes synchronized on remoteHost
+        // before/after request
+        protected HttpChannel activeHttp;
 
         @Override
-        public void handleReceived(IOChannel ch) throws IOException {
+        public final void handleReceived(IOChannel ch) throws IOException {
+            int before = ch.getIn().available();
             dataReceived(ch.getIn());
         }
 
@@ -451,9 +414,7 @@
         /** 
          * Incoming data.
          */
-        public void dataReceived(IOBuffer iob) throws IOException {
-            
-        }
+        public abstract void dataReceived(IOBuffer iob) throws IOException;
         
         /** 
          * Framing error, client interrupt, etc.
@@ -474,12 +435,12 @@
 
         @Override
         public IOBuffer getIn() {
-            return net.getIn();
+            return net == null ? null : net.getIn();
         }
 
         @Override
         public IOBuffer getOut() {
-            return net.getOut();
+            return net == null ? null : net.getOut();
         }
 
         @Override
@@ -493,188 +454,22 @@
         protected void outClosed(HttpChannel http) throws IOException {
         }
 
-        protected void endSendReceive(HttpChannel httpChannel) throws IOException {
-            return;
-        }
-
-        public void withExtraBuffer(BBuffer received) {
-            return;
-        }
-        
-    }
-
-    /** 
-     * Connections for one remote host.
-     * This should't be restricted by IP:port or even hostname,
-     * for example if a server has multiple IPs or LB replicas - any would work.   
-     */
-    public static class RemoteServer {
-        public ArrayList<HttpConnection> connections = new ArrayList<HttpConnection>();
-    }
-
-    // TODO: add timeouts, limits per host/total, expire old entries 
-    // TODO: discover apr and use it
-    
-    public class ConnectionPool {
-        // visible for debugging - will be made private, with accessor 
         /**
-         * Map from client names to socket pools.
+         * Called by HttpChannel when both input and output are fully 
+         * sent/received. When this happens the request is no longer associated
+         * with the Connection, and the connection can be re-used. 
+         * 
+         * The channel can still be used to access the retrieved data that may
+         * still be buffered until HttpChannel.release() is called.
+         * 
+         * This method will be called only once, for both succesful and aborted
+         * requests. 
          */
-        public Map<CharSequence, RemoteServer> hosts = new HashMap<CharSequence, 
-            RemoteServer>();
-
-        // Statistics
-        public AtomicInteger waitingSockets = new AtomicInteger();
-        public AtomicInteger closedSockets = new AtomicInteger();
-
-        public AtomicInteger hits = new AtomicInteger();
-        public AtomicInteger misses = new AtomicInteger();
-
-        public int getTargetCount() {
-            return hosts.size();
-        }
-
-        public int getSocketCount() {
-            return waitingSockets.get();
-        }
-
-        public int getClosedSockets() {
-            return closedSockets.get();
-        }
-
-        public Set<CharSequence> getKeepAliveTargets() {
-            return hosts.keySet();
-        }
-
-        /** 
-         * @param key host:port, or some other key if multiple hosts:ips
-         * are connected to equivalent servers ( LB ) 
-         * @param httpCh 
-         * @throws IOException 
-         */
-        public HttpConnection send(HttpChannel httpCh) 
-                throws IOException {
-            String target = httpCh.getTarget();
-            HttpConnection con = null;
-            // TODO: check ssl on connection - now if a second request 
-            // is received on a ssl connection - we just send it
-            boolean ssl = httpCh.getRequest().isSecure();
-            
-            RemoteServer t = null;
-            synchronized (hosts) {
-                t = hosts.get(target);
-                if (t == null) {
-                    misses.incrementAndGet();
-                }
-            }
-            if (t != null) {
-                synchronized (t) {
-                    if (t.connections.size() == 0) {
-                        misses.incrementAndGet();
-                    } else {
-                        con = conManager.getFromPool(t);
-
-                        if (!con.isOpen()) {
-                            con.setDataReceivedCallback(null);
-                            con.close();
-                            log.fine("Already closed " + con);
-                            con = null;
-                            misses.incrementAndGet();
-                        } else {
-                            hits.incrementAndGet();
-                            if (debug) {
-                                httpCh.trace("HTTP_CONNECT: Reuse connection " + target + " " + this);
-                            }
-                        }
-                    }
-                }
-            }
-            
-            if (con == null) {
-                if (debug) {
-                    httpCh.trace("HTTP_CONNECT: New connection " + target);
-                }
-                String[] hostPort = target.split(":");
-                
-                int targetPort = ssl ? 443 : 80;
-                if (hostPort.length > 1) {
-                    targetPort = Integer.parseInt(hostPort[1]);
-                }
-                
-                getIOConnector().connect(hostPort[0], targetPort,
-                        httpCh.connectedCallback);
-            } else {
-                con.beforeRequest();
-                httpCh.setConnection(con);
-                con.sendRequest(httpCh);
-            }
-            
-            
-            return con;      
-        }
+        protected abstract void endSendReceive(HttpChannel httpChannel) throws IOException;
 
-        /**
-         * Must be called in IOThread for the channel
-         */
-        public void returnChannel(HttpConnection ch) 
-                throws IOException {
-            CharSequence key = ch.getTarget(); 
-            if (key == null) {
-                ch.close();
-                if (debug) {
-                    log.info("Return channel, no target ..." + key + " " + ch);
-                }
-                return;
-            }
-            
-            if (!ch.isOpen()) {
-                ch.close(); // make sure all closed
-                if (debug) {
-                    log.info("Return closed channel ..." + key + " " + ch);
-                }
-                return;
-            }
-            
-            RemoteServer t = null;
-            synchronized (hosts) {
-                t = hosts.get(key);
-                if (t == null) {
-                    t = new RemoteServer();
-                    hosts.put(key, t);
-                }
-            }
-            waitingSockets.incrementAndGet();
-            
-            ch.ts = System.currentTimeMillis();
-            synchronized (t) {
-                t.connections.add(ch);      
-            }
+        public void withExtraBuffer(BBuffer received) {
+            return;
         }
         
-        // Called by handleClosed
-        void stopKeepAlive(IOChannel schannel) {
-            CharSequence target = schannel.getTarget();
-            RemoteServer t = null;
-            synchronized (hosts) {
-                t = hosts.get(target);
-                if (t == null) {
-                    return;
-                }
-            }
-            synchronized (t) {
-                if (t.connections.remove(schannel)) {      
-                    waitingSockets.decrementAndGet();
-                    if (t.connections.size() == 0) {
-                        hosts.remove(target);
-                    }
-                }
-            }
-        }
-    }
-
-    public HttpConnector withConnectionManager(
-            HttpConnectionManager connectionManager) {
-        this.conManager = connectionManager;
-        return this;
     }
 }

Modified: tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/SpdyConnection.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/SpdyConnection.java?rev=898619&r1=898618&r2=898619&view=diff
==============================================================================
--- tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/SpdyConnection.java (original)
+++ tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/SpdyConnection.java Wed Jan 13 02:07:25 2010
@@ -3,18 +3,26 @@
 package org.apache.tomcat.lite.http;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.tomcat.lite.http.HttpConnector.HttpConnection;
-import org.apache.tomcat.lite.http.HttpConnector.RemoteServer;
+import org.apache.tomcat.lite.http.HttpConnectionPool.RemoteServer;
 import org.apache.tomcat.lite.http.HttpMessage.HttpMessageBytes;
 import org.apache.tomcat.lite.io.BBucket;
 import org.apache.tomcat.lite.io.BBuffer;
 import org.apache.tomcat.lite.io.CBuffer;
+import org.apache.tomcat.lite.io.DumpChannel;
 import org.apache.tomcat.lite.io.IOBuffer;
+import org.apache.tomcat.lite.io.IOChannel;
+import org.apache.tomcat.lite.io.IOConnector;
+import org.apache.tomcat.lite.io.SslChannel;
 
 /*
  * TODO: expectations ? 
@@ -27,23 +35,9 @@
  *    http://localhost:8802/hello
  */
 
-public class SpdyConnection extends HttpConnector.HttpConnection  {
+public class SpdyConnection extends HttpConnector.HttpConnection 
+        implements IOConnector.ConnectedCallback {
     
-    public static class SpdyConnectionManager 
-        extends HttpConnector.HttpConnectionManager {
-        @Override
-        public HttpConnection newConnection(HttpConnector con) {
-            return new SpdyConnection(con);
-        }
-
-        @Override
-        public HttpConnection getFromPool(RemoteServer t) {
-            // TODO: we may initiate multiple SPDY connections with each server
-            // Sending frames is synchronized, receiving is muxed
-            return t.connections.get(0);
-        }
-        
-    }
     
     /** Use compression for headers. Will magically turn to false
      * if the first request doesn't have x8xx ( i.e. compress header )
@@ -77,11 +71,16 @@
     
     /**
      * @param spdyConnector
+     * @param remoteServer 
      */
-    SpdyConnection(HttpConnector spdyConnector) {
+    SpdyConnection(HttpConnector spdyConnector, RemoteServer remoteServer) {
         this.httpConnector = spdyConnector;
+        this.remoteHost = remoteServer;
+        this.target = remoteServer.target;
     }
 
+    AtomicInteger streamErrors = new AtomicInteger();
+    
     AtomicInteger lastInStream = new AtomicInteger();
     AtomicInteger lastOutStream = new AtomicInteger();
 
@@ -111,9 +110,12 @@
     }
     
     @Override
-    public void dataReceived(IOBuffer iob) throws IOException {
-        int avail = iob.available();
-        while (avail > 0) {
+    public synchronized void dataReceived(IOBuffer iob) throws IOException {
+        while (true) {
+            int avail = iob.available();
+            if (avail == 0) {
+                return;
+            }
             if (currentInFrame == null) {
                 if (inFrameBuffer.remaining() + avail < 8) {
                     return;
@@ -121,12 +123,11 @@
                 if (inFrameBuffer.remaining() < 8) {
                     int headRest = 8 - inFrameBuffer.remaining();
                     int rd = iob.read(inFrameBuffer, headRest);
-                    avail -= rd;
                 }
                 currentInFrame = new SpdyConnection.Frame(); // TODO: reuse
-                currentInFrame.parse(inFrameBuffer);
+                currentInFrame.parse(this, inFrameBuffer);
             }
-            if (avail < currentInFrame.length) {
+            if (iob.available() < currentInFrame.length) {
                 return;
             }
             // We have a full frame. Process it.
@@ -134,11 +135,21 @@
 
             // TODO: extra checks, make sure the frame is correct and
             // it consumed all data.
-            avail -= currentInFrame.length;
             currentInFrame = null;
         }
     }
 
+    AtomicInteger inFrames = new AtomicInteger();
+    AtomicInteger inDataFrames = new AtomicInteger();
+    AtomicInteger inSyncStreamFrames = new AtomicInteger();
+    AtomicInteger inBytes = new AtomicInteger();
+
+    AtomicInteger outFrames = new AtomicInteger();
+    AtomicInteger outDataFrames = new AtomicInteger();
+    AtomicInteger outBytes = new AtomicInteger();
+    
+    
+    
     /**
      * Frame received. Must consume all data for the frame.
      * 
@@ -148,11 +159,14 @@
     protected void onFrame(IOBuffer iob) throws IOException {
         // TODO: make sure we have enough data.
         lastFrame = currentInFrame;
+        inFrames.incrementAndGet();
+        inBytes.addAndGet(currentInFrame.length + 8);
         
         if (currentInFrame.c) {
             if (currentInFrame.type == SpdyConnection.Frame.TYPE_HELO) {
                 // receivedHello = currentInFrame;
             } else if (currentInFrame.type == SpdyConnection.Frame.TYPE_SYN_STREAM) {
+                inSyncStreamFrames.incrementAndGet();
                 HttpChannel ch = new HttpChannel(); // TODO: reuse
                 ch.channelId = SpdyConnection.readInt(iob);
                 ch.setConnection(this);
@@ -164,15 +178,22 @@
                     ch.setHttpService(this.httpConnector.defaultService);
                 }
 
-                channels.put(ch.channelId, ch);
-
-                // pri and unused
-                SpdyConnection.readShort(iob);
-
-                HttpMessageBytes reqBytes = ch.getRequest().getMsgBytes();
-                
-                BBuffer head = processHeaders(iob, ch, reqBytes);
+                synchronized (this) {
+                        channels.put(ch.channelId, ch);                    
+                }
 
+                try {
+                    // pri and unused
+                    SpdyConnection.readShort(iob);
+
+                    HttpMessageBytes reqBytes = ch.getRequest().getMsgBytes();
+
+                    processHeaders(iob, ch, reqBytes);
+                } catch (Throwable t) {
+                    log.log(Level.SEVERE, "Error parsing head", t);
+                    abort("Error reading headers " + t);
+                    return;
+                }
                 ch.getRequest().processReceivedHeaders();
 
                 ch.handleHeadersReceived(ch.getRequest());
@@ -183,14 +204,24 @@
                 }
             } else if (currentInFrame.type == SpdyConnection.Frame.TYPE_SYN_REPLY) {
                 int chId = SpdyConnection.readInt(iob);
-                HttpChannel ch = channels.get(chId);
-                
-                SpdyConnection.readShort(iob);
+                HttpChannel ch;
+                synchronized (this) {
+                    ch = channels.get(chId);
+                    if (ch == null) {
+                        abort("Channel not found");
+                    }
+                }
+                try {
+                    SpdyConnection.readShort(iob);
         
-                HttpMessageBytes resBytes = ch.getResponse().getMsgBytes();
+                    HttpMessageBytes resBytes = ch.getResponse().getMsgBytes();
                 
-                BBuffer head = processHeaders(iob, ch, resBytes);
-
+                    BBuffer head = processHeaders(iob, ch, resBytes);
+                } catch (Throwable t) {
+                    log.log(Level.SEVERE, "Error parsing head", t);
+                    abort("Error reading headers " + t);
+                    return;
+                }
                 ch.getResponse().processReceivedHeaders();
 
                 ch.handleHeadersReceived(ch.getResponse());
@@ -204,8 +235,12 @@
                 iob.advance(currentInFrame.length);
             }
         } else {
+            inDataFrames.incrementAndGet();
             // data frame - part of an existing stream
-            HttpChannel ch = channels.get(currentInFrame.streamId);
+            HttpChannel ch;
+            synchronized (this) {
+                ch = channels.get(currentInFrame.streamId);
+            }
             if (ch == null) {
                 log.warning("Unknown stream ");
                 net.close();
@@ -215,9 +250,14 @@
             int len = currentInFrame.length;
             while (len > 0) {
                 BBucket bb = iob.peekFirst();
+                if (bb == null) {
+                    // we should have all data
+                    abort("Unexpected short read");
+                    return;
+                }
                 if (len > bb.remaining()) {
                     ch.getIn().append(bb);
-                    len += bb.remaining();
+                    len -= bb.remaining();
                     bb.position(bb.limit());
                 } else {
                     ch.getIn().append(bb, len);
@@ -228,19 +268,31 @@
             ch.sendHandleReceivedCallback();
             
             if ((currentInFrame.flags & SpdyConnection.Frame.FLAG_HALF_CLOSE) != 0) {
-                ch.getIn().close();
                 ch.handleEndReceive();
             }
         }
         firstFrame = false;
     }
+    
+    /**
+     * On frame error.
+     */
+    private void abort(String msg) throws IOException {
+        streamErrors.incrementAndGet();
+        for (HttpChannel ch : channels.values()) {
+            ch.abort(msg);
+        }
+        close();
+    }
 
     private BBuffer processHeaders(IOBuffer iob, HttpChannel ch,
             HttpMessageBytes reqBytes) throws IOException {
-        int res = iob.peek() & 0xFF;
         int nvCount = 0;
-        if (firstFrame && (res & 0x0F) !=  8) {
-            headerCompression = false;
+        if (firstFrame) {
+            int res = iob.peek() & 0xFF;
+            if ((res & 0x0F) !=  8) {
+                headerCompression = false;
+            }
         }
         headRecvBuf.recycle();
         if (headerCompression) {
@@ -257,7 +309,11 @@
         } else {
             nvCount = readShort(iob);
             // 8 = stream Id (4) + pri/unused (2) + nvCount (2)
-            iob.read(headRecvBuf, currentInFrame.length - 8);
+            // we know we have enough data
+            int rd = iob.read(headRecvBuf, currentInFrame.length - 8);
+            if (rd != currentInFrame.length - 8) {
+                abort("Unexpected incomplete read");
+            }
         }
         // Wrapper - so we don't change position in head
         headRecvBuf.wrapTo(headW);
@@ -268,6 +324,9 @@
         for (int i = 0; i < nvCount; i++) {
 
             int nameLen = SpdyConnection.readShort(headW);
+            if (nameLen > headW.remaining()) {
+                abort("Name too long");
+            }
 
             nameBuf.setBytes(headW.array(), headW.position(),
                             nameLen);
@@ -300,18 +359,23 @@
     }
 
     @Override
-    protected void sendRequest(HttpChannel http) throws IOException {
+    protected synchronized void sendRequest(HttpChannel http) throws IOException {
         if (serverMode) {
             throw new IOException("Only in client mode");
         }
-
+        if (!checkConnection(http)) {
+            return;
+        }
         MultiMap mimeHeaders = http.getRequest().getMimeHeaders();
         BBuffer headBuf = BBuffer.allocate();
         
         SpdyConnection.appendShort(headBuf, mimeHeaders.size() + 3);
         
         serializeMime(mimeHeaders, headBuf);
-
+        
+        if (headerCompression) {
+        }
+        
         // TODO: url - with host prefix , method
         // optimize...
         SpdyConnection.appendAsciiHead(headBuf, "version");
@@ -332,6 +396,11 @@
         out.putByte(0x00); 
         out.putByte(0x01);
         
+        CBuffer method = http.getRequest().method();
+        if (method.equals("GET") || method.equals("HEAD")) {
+            http.getOut().close();
+        }
+        
         if (http.getOut().isAppendClosed()) {
             out.putByte(0x01); // closed
         } else {
@@ -348,17 +417,32 @@
             http.channelId = 2 * lastOutStream.incrementAndGet() + 1;            
         }
         SpdyConnection.appendInt(out, http.channelId);
-        
-        channels.put(http.channelId, http);
+        http.setConnection(this);
+
+        synchronized (this) {
+            channels.put(http.channelId, http);            
+        }
         
         out.putByte(0x00); // no priority 
         out.putByte(0x00); 
         
         sendFrame(out, headBuf); 
 
+        if (http.outMessage.state == HttpMessage.State.HEAD) {
+            http.outMessage.state = HttpMessage.State.BODY_DATA;
+        }
+        if (http.getOut().isAppendClosed()) {
+            http.handleEndSent();
+        }
+
         // Any existing data
         //sendData(http);
     }
+
+    
+    public synchronized Collection<HttpChannel> getActives() {
+        return channels.values();
+    }
     
     @Override
     protected synchronized void sendResponseHeaders(HttpChannel http) throws IOException {
@@ -415,7 +499,8 @@
         // It seems piggibacking data is not allowed
         frameHead.putByte(0x00); 
 
-        SpdyConnection.append24(frameHead, headBuf.remaining() + 6);
+        int len = headBuf.remaining() + 6;
+        SpdyConnection.append24(frameHead, len);
 
 //        // Stream-Id, unused
         SpdyConnection.appendInt(frameHead, http.channelId);
@@ -474,11 +559,14 @@
         if (net == null) {
             return; // unit test
         }
+        outBytes.addAndGet(out.remaining());
         net.getOut().append(out);
         if (headBuf != null) {
             net.getOut().append(headBuf);
+            outBytes.addAndGet(headBuf.remaining());
         }
         net.startSending();
+        outFrames.incrementAndGet();
     }
 
     public synchronized void sendDataFrame(IOBuffer out2, int avail,
@@ -497,11 +585,14 @@
         // TODO: chunk if too much data ( at least at 24 bits)
         SpdyConnection.append24(outFrameBuffer, avail);
 
+        outBytes.addAndGet(outFrameBuffer.remaining() + avail);
         net.getOut().append(outFrameBuffer);
+
         if (avail > 0) {
             net.getOut().append(out2, avail);
         }
         net.startSending();
+        outDataFrames.incrementAndGet();        
     }
 
     static void appendInt(BBuffer headBuf, int length) throws IOException {
@@ -579,9 +670,11 @@
     
         static int FLAG_HALF_CLOSE = 1;
     
-        public void parse(BBuffer iob) throws IOException {
+        public void parse(SpdyConnection spdyConnection,
+                BBuffer iob) throws IOException {
             int b0 = iob.read();
             if (b0 < 128) {
+                // data frame 
                 c = false;
                 streamId = b0;
                 for (int i = 0; i < 3; i++) {
@@ -592,6 +685,10 @@
                 c = true;
                 b0 -= 128;
                 version = ((b0 << 8) | iob.read());
+                if (version != 1) {
+                    spdyConnection.abort("Wrong version");
+                    return;
+                }
                 b0 = iob.read();
                 type = ((b0 << 8) | iob.read());
             }
@@ -601,12 +698,23 @@
                 b0 = iob.read();
                 length = length << 8 | b0;
             }
-
+            
             iob.recycle();
         }
     
     }
     
+    @Override
+    protected void endSendReceive(HttpChannel http) throws IOException {
+        synchronized (this) {
+            HttpChannel doneHttp = channels.remove(http.channelId);
+            if (doneHttp != http) {
+                log.severe("Error removing " + doneHttp + " " + http);
+            }
+        }
+        httpConnector.cpool.afterRequest(http, this, true);
+    }
+    
     /** 
      * Framing error, client interrupt, etc.
      */
@@ -615,5 +723,86 @@
         
     }
 
+    
+    volatile boolean connecting = false;
+    volatile boolean connected = false;
+    
+    
+    private boolean checkConnection(HttpChannel http) throws IOException {
+        synchronized(this) {
+            if (net == null || !isOpen()) {
+                connected = false;
+            }
+        
+            if (!connected) {
+                if (!connecting) {
+                    // TODO: secure set at start ? 
+                    connecting = true;
+                    httpConnector.cpool.httpConnect(http, 
+                            target.toString(), 
+                            http.getRequest().isSecure(), this);
+                }
+                
+                synchronized (remoteHost) {
+                    remoteHost.pending.add(http);
+                    httpConnector.cpool.queued.incrementAndGet();
+                }
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    @Override
+    public void handleConnected(IOChannel net) throws IOException {
+        HttpChannel httpCh = null;
+        if (!net.isOpen()) {
+            while (true) {
+                synchronized (remoteHost) {
+                    if (remoteHost.pending.size() == 0) {
+                        return;
+                    }
+                    httpCh = remoteHost.pending.remove();
+                }
+                httpCh.abort("Can't connect");
+            }            
+        }
+
+        synchronized (remoteHost) {
+            httpCh = remoteHost.pending.peek();
+        }
+        secure = httpCh.getRequest().isSecure();
+        if (secure) {
+            SslChannel ch1 = new SslChannel();
+            ch1.setSslContext(httpConnector.sslConnector.getSSLContext());
+            ch1.setSink(net);
+            net.addFilterAfter(ch1);
+            net = ch1;
+        }
+        
+        if (httpConnector.debugHttp) {
+            IOChannel ch1 = new DumpChannel("");
+            net.addFilterAfter(ch1);
+            net = ch1;                        
+        }
+        
+        setSink(net);
+        
+        synchronized(this) {
+            connecting = false;
+            connected = true;
+        }
+ 
+        while (true) {
+            synchronized (remoteHost) {
+                if (remoteHost.pending.size() == 0) {
+                    return;
+                }
+                httpCh = remoteHost.pending.remove();
+            }
+            sendRequest(httpCh);
+        }
 
+    }
 }
\ No newline at end of file

Modified: tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/io/IOBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/io/IOBuffer.java?rev=898619&r1=898618&r2=898619&view=diff
==============================================================================
--- tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/io/IOBuffer.java (original)
+++ tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/io/IOBuffer.java Wed Jan 13 02:07:25 2010
@@ -348,18 +348,25 @@
     }
     
     public int read(byte[] buf, int off, int len) throws IOException {
-        BBucket bucket = peekFirst();
         if (isClosedAndEmpty()) {
             return -1;
         }
-        if (bucket == null) {
-            return 0;
+        int rd = 0;
+        while (true) {
+            BBucket bucket = peekFirst();
+            if (bucket == null) {
+                return rd;
+            }
+            int toCopy = Math.min(len, bucket.remaining());
+            System.arraycopy(bucket.array(), bucket.position(), 
+                    buf, off + rd, toCopy);
+            bucket.position(bucket.position() + toCopy);
+            rd += toCopy;
+            len -= toCopy;
+            if (len == 0) {
+                return rd;
+            }
         }
-        int toCopy = Math.min(len, bucket.remaining());
-        System.arraycopy(bucket.array(), bucket.position(), buf, 
-                off, toCopy);
-        bucket.position(bucket.position() + toCopy);
-        return toCopy;
         
     }
 

Modified: tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/io/IOChannel.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/io/IOChannel.java?rev=898619&r1=898618&r2=898619&view=diff
==============================================================================
--- tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/io/IOChannel.java (original)
+++ tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/io/IOChannel.java Wed Jan 13 02:07:25 2010
@@ -27,7 +27,7 @@
     protected IOChannel app;
     
     protected String id;
-    protected String target;    
+    protected CharSequence target;    
 
     protected IOConnector connector;
 
@@ -136,7 +136,7 @@
         shutdownOutput();
         // Should it read the buffers ? 
         
-        if (getIn().isAppendClosed()) {
+        if (getIn() == null || getIn().isAppendClosed()) {
             return;
         } else {
             getIn().close();
@@ -146,11 +146,13 @@
     }
 
     public boolean isOpen() {
-        return !getIn().isAppendClosed() && !getOut().isAppendClosed();
+        return getIn() != null && 
+        getOut() != null && 
+        !getIn().isAppendClosed() && !getOut().isAppendClosed();
     }
     
     public void shutdownOutput() throws IOException {
-        if (getOut().isAppendClosed()) {
+        if (getOut() == null || getOut().isAppendClosed()) {
             return;
         } else {
             getOut().close();
@@ -290,5 +292,8 @@
         return target;
     }
     
+    public void setTarget(CharSequence target) {
+        this.target = target;
+    }
     
 }

Modified: tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/service/IOStatus.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/service/IOStatus.java?rev=898619&r1=898618&r2=898619&view=diff
==============================================================================
--- tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/service/IOStatus.java (original)
+++ tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/service/IOStatus.java Wed Jan 13 02:07:25 2010
@@ -3,14 +3,15 @@
 package org.apache.tomcat.lite.service;
 
 import java.io.IOException;
-import java.util.Map;
+import java.util.List;
 
+import org.apache.tomcat.lite.http.HttpConnectionPool;
 import org.apache.tomcat.lite.http.HttpRequest;
 import org.apache.tomcat.lite.http.HttpResponse;
 import org.apache.tomcat.lite.http.HttpWriter;
 import org.apache.tomcat.lite.http.HttpChannel.HttpService;
-import org.apache.tomcat.lite.http.HttpConnector.ConnectionPool;
-import org.apache.tomcat.lite.http.HttpConnector.RemoteServer;
+import org.apache.tomcat.lite.http.HttpConnectionPool.RemoteServer;
+import org.apache.tomcat.lite.http.HttpConnector.HttpConnection;
 import org.apache.tomcat.lite.io.IOChannel;
 
 /**
@@ -18,30 +19,32 @@
  */
 public class IOStatus implements HttpService {
 
-    private ConnectionPool pool;
+    private HttpConnectionPool pool;
 
-    public IOStatus(ConnectionPool pool) {
+    public IOStatus(HttpConnectionPool pool) {
         this.pool = pool;
     }
     
     @Override
     public void service(HttpRequest httpReq, HttpResponse httpRes)
             throws IOException {
-        ConnectionPool sc = pool;
+        HttpConnectionPool sc = pool;
         HttpWriter out = httpRes.getBodyWriter();
         
         httpRes.setContentType("text/plain");
+        // TODO: use JMX/DynamicObject to get all public info
         out.println("hosts=" + sc.getTargetCount());
         out.println("waiting=" + sc.getSocketCount());
         out.println("closed=" + sc.getClosedSockets());
         out.println();
 
-        for (Map.Entry<CharSequence, RemoteServer> e: sc.hosts.entrySet()) {
-            out.append(e.getKey());
+        for (RemoteServer remote: sc.getServers()) {
+            out.append(remote.target);
             out.append("=");
-            out.println(Integer.toString(e.getValue().connections.size()));
+            List<HttpConnection> connections = remote.getConnections();
+            out.println(Integer.toString(connections.size()));
 
-            for (IOChannel ch: e.getValue().connections) {
+            for (IOChannel ch: connections) {
                 out.println(ch.getId() + 
                         " " + ch.toString());
             }

Modified: tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/TestMain.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/TestMain.java?rev=898619&r1=898618&r2=898619&view=diff
==============================================================================
--- tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/TestMain.java (original)
+++ tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/TestMain.java Wed Jan 13 02:07:25 2010
@@ -233,7 +233,7 @@
         URL url = new URL(path);
         HttpURLConnection connection = 
             (HttpURLConnection) url.openConnection();
-       // connection.setReadTimeout(100000);
+        connection.setReadTimeout(10000);
         connection.connect();
         int rc = connection.getResponseCode();
         InputStream is = connection.getInputStream();

Modified: tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/http/HttpChannelInMemoryTest.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/http/HttpChannelInMemoryTest.java?rev=898619&r1=898618&r2=898619&view=diff
==============================================================================
--- tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/http/HttpChannelInMemoryTest.java (original)
+++ tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/http/HttpChannelInMemoryTest.java Wed Jan 13 02:07:25 2010
@@ -27,7 +27,7 @@
     /** 
      * Last http channel created by the connection
      */
-    HttpChannel http;
+    volatile HttpChannel http;
     
     // Input/output for the connection
     MemoryIOConnector.MemoryIOChannel net = new MemoryIOChannel();

Modified: tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/http/LiveHttp1Test.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/http/LiveHttp1Test.java?rev=898619&r1=898618&r2=898619&view=diff
==============================================================================
--- tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/http/LiveHttp1Test.java (original)
+++ tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/http/LiveHttp1Test.java Wed Jan 13 02:07:25 2010
@@ -86,7 +86,7 @@
         httpClient.requestURI().set("/chunked/foo");
         httpClient.send();
         httpClient.readAll(bodyRecvBuffer, to);
-        assertTrue(bodyRecvBuffer.toString().indexOf("AAA") >= 0);
+        assertTrue(bodyRecvBuffer.toString(), bodyRecvBuffer.toString().indexOf("AAA") >= 0);
     }
 
     // Check waitResponseHead()

Modified: tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/http/SpdyTest.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/http/SpdyTest.java?rev=898619&r1=898618&r2=898619&view=diff
==============================================================================
--- tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/http/SpdyTest.java (original)
+++ tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/http/SpdyTest.java Wed Jan 13 02:07:25 2010
@@ -9,17 +9,15 @@
 import junit.framework.TestCase;
 
 import org.apache.tomcat.lite.TestMain;
+import org.apache.tomcat.lite.http.HttpConnectionPool.RemoteServer;
 import org.apache.tomcat.lite.io.IOBuffer;
-import org.apache.tomcat.lite.http.SpdyConnection.SpdyConnectionManager;
 
 public class SpdyTest extends TestCase {
     HttpConnector http11Con = TestMain.shared().getClient();
     
-    static HttpConnector spdyCon = DefaultHttpConnector.get()
-        .withConnectionManager(new SpdyConnectionManager());
+    static HttpConnector spdyCon = DefaultHttpConnector.get();
     
-    HttpConnector memSpdyCon = 
-        new HttpConnector(null).withConnectionManager(new SpdyConnectionManager());
+    HttpConnector memSpdyCon = new HttpConnector(null);
     
     public void testClient() throws IOException {
         HttpRequest req = 
@@ -43,7 +41,7 @@
         IOBuffer iob = new IOBuffer();
         iob.append(is);
         
-        SpdyConnection con = (SpdyConnection) memSpdyCon.newConnection();
+        SpdyConnection con = new SpdyConnection(memSpdyCon, new RemoteServer());
         
         // By default it has a dispatcher buit-in 
         con.serverMode = true;
@@ -72,7 +70,7 @@
         IOBuffer iob = new IOBuffer();
         iob.append(is);
         
-        SpdyConnection con = (SpdyConnection) memSpdyCon.newConnection();
+        SpdyConnection con = new SpdyConnection(memSpdyCon, new RemoteServer());
         
         // By default it has a dispatcher buit-in 
         con.serverMode = true;
@@ -95,8 +93,8 @@
     public void testLargeInt() throws Exception {
         
         IOBuffer iob = new IOBuffer();
-        iob.append(0xFF);
-        iob.append(0xFF);
+        iob.append(0x80);
+        iob.append(0x01);
         iob.append(0xFF);
         iob.append(0xFF);
 
@@ -105,12 +103,34 @@
         iob.append(0xFF);
         iob.append(0xFF);
 
-        SpdyConnection con = (SpdyConnection) memSpdyCon.newConnection();
+        SpdyConnection con = new SpdyConnection(memSpdyCon, new RemoteServer());
         con.dataReceived(iob);
-        assertEquals(0x7FFF, con.currentInFrame.version);
+        assertEquals(1, con.currentInFrame.version);
         assertEquals(0xFFFF, con.currentInFrame.type);
         assertEquals(0xFF, con.currentInFrame.flags);
         assertEquals(0xFFFFFF, con.currentInFrame.length);
         
     }
+
+    // Does int parsing works ?
+    public void testBad() throws Exception {
+        
+            IOBuffer iob = new IOBuffer();
+            iob.append(0xFF);
+            iob.append(0xFF);
+            iob.append(0xFF);
+            iob.append(0xFF);
+
+            iob.append(0xFF);
+            iob.append(0xFF);
+            iob.append(0xFF);
+            iob.append(0xFF);
+
+            SpdyConnection con = new SpdyConnection(memSpdyCon, new RemoteServer());
+            con.dataReceived(iob);
+            
+            assertEquals(1, con.streamErrors.get());
+        
+    }
+
 }

Modified: tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/load/LiveHttpThreadedTest.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/load/LiveHttpThreadedTest.java?rev=898619&r1=898618&r2=898619&view=diff
==============================================================================
--- tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/load/LiveHttpThreadedTest.java (original)
+++ tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/load/LiveHttpThreadedTest.java Wed Jan 13 02:07:25 2010
@@ -68,175 +68,184 @@
  * it seems there is a bug as well.
  */
 public class LiveHttpThreadedTest extends TestCase {
-  HttpConnector clientCon = TestMain.shared().getClient();
-  ThreadRunner tr;
-  static MBeanServer server;
-  
-  AtomicInteger ok = new AtomicInteger();
-  Object lock = new Object();
-  int reqCnt;
-
-  Map<HttpRequest, HttpRequest> active = new HashMap();
-  
-  public void test1000Async() throws Exception {
-      try {
-          asyncRequest(10, 100, false);
-      } finally {
-          dumpHeap("heapAsync.bin");
-      }
-      
-  }
-
-  public void test10000Async() throws Exception {
-      try {
-          asyncRequest(20, 500, false);
-      } finally {
-          dumpHeap("heapAsync.bin");
-      }
-  }
-  
-  public void xtest1000AsyncSpdy() throws Exception {
-      try {
-          asyncRequest(10, 20, true);
-      } finally {
-          dumpHeap("heapAsync.bin");
-      }
-      
-  }
-
-  public void xtest10000AsyncSpdy() throws Exception {
-      try {
-          asyncRequest(20, 500, true);
-      } finally {
-          dumpHeap("heapAsync.bin");
-      }
-  }
-  
-  public void asyncRequest(int thr, int perthr, boolean spdy) throws Exception {
-      reqCnt = thr * perthr;
-      if (spdy) {
-          // TODO: simpler API ( 'allowSpdy', etc ) - after negotiation is impl
-          clientCon.withConnectionManager(new SpdyConnection.SpdyConnectionManager());
-      }
-      long t0 = System.currentTimeMillis();
-      tr = new ThreadRunner(thr, perthr) {
-          public void makeRequest(int i) throws Exception {
-              HttpRequest cstate = clientCon.request("localhost", 8802);
-              synchronized (active) {
-                  active.put(cstate, cstate);
-              }
-              
-              cstate.requestURI().set("/hello");
-              cstate.setCompletedCallback(reqCallback);
-              
-              // Send the request, wait response
-              Thread.currentThread().sleep(20);
-              cstate.send();
+    HttpConnector clientCon = TestMain.shared().getClient();
+    HttpConnector serverCon = TestMain.shared().getTestServer();
+    ThreadRunner tr;
+    static MBeanServer server;
+
+    AtomicInteger ok = new AtomicInteger();
+    Object lock = new Object();
+    int reqCnt;
+
+    Map<HttpRequest, HttpRequest> active = new HashMap();
+
+    public void tearDown() throws IOException {
+        clientCon.cpool.clear();
+    }
+    
+    public void test1000Async() throws Exception {
+        try {
+            asyncRequest(10, 100, false);
+        } finally {
+            dumpHeap("heapAsync.bin");
+        }
+
+    }
+
+    public void test10000Async() throws Exception {
+        try {
+            asyncRequest(20, 500, false);
+        } finally {
+            dumpHeap("heapAsyncBig.bin");
+        }
+    }
+
+    public void test1000AsyncSpdy() throws Exception {
+        try {
+            asyncRequest(10, 100, true);
+        } finally {
+            dumpHeap("heapSpdy1000.bin");
+        }
+
+    }
+
+    public void test10000AsyncSpdy() throws Exception {
+        try {
+            asyncRequest(20, 500, true);
+        } finally {
+            dumpHeap("heapSpdy10000.bin");
+        }
+    }
+
+    public void asyncRequest(int thr, int perthr, 
+            final boolean spdy) throws Exception {
+        reqCnt = thr * perthr;
+        long t0 = System.currentTimeMillis();
+        tr = new ThreadRunner(thr, perthr) {
+            public void makeRequest(int i) throws Exception {
+                HttpRequest cstate = clientCon.request("localhost", 8802);
+                synchronized (active) {
+                    active.put(cstate, cstate);
+                }
+                if (spdy) {
+                    // Magic way to force spdy - will be replaced with
+                    // a negotiation.
+                    cstate.setProtocol("SPDY/1.0");
+                }
+                cstate.requestURI().set("/hello");
+                cstate.setCompletedCallback(reqCallback);
+                // no body
+                cstate.getBody().close();
+                // Send the request, wait response
+                Thread.currentThread().sleep(20);
+                cstate.send();
             }
-      };
-      tr.run();
-      assertEquals(0, tr.errors.get());
-      synchronized (lock) {
-          lock.wait(reqCnt * 100);
-      }
-      assertEquals(reqCnt, ok.get());
-      System.err.println(reqCnt + " Async requests: " + (System.currentTimeMillis() - t0));
-  }
-  
-  public void testURLRequest() throws Exception {
-      urlRequest(10, 100);
-  }
-
-  public void testURLRequest2() throws Exception {
-      urlRequest(20, 500);
-
-  }
-  
-  /** 
-   * HttpURLConnection client against lite.http server.
-   */
-  public void urlRequest(int thr, int cnt) throws Exception {
-      long t0 = System.currentTimeMillis();
-          
-      
-      try {
-          HttpConnector testServer = TestMain.getTestServer();
-          
-          tr = new ThreadRunner(thr, cnt) {
-
-              public void makeRequest(int i) throws Exception {
-                  try {
-                      ByteChunk out = new ByteChunk();
-                      HttpURLConnection con = TestMain.getUrl("http://localhost:8802/hello", out);
-                      if (con.getResponseCode() != 200) {
-                          errors.incrementAndGet();
-                      }
-                      if (!"Hello world".equals(out.toString())) {
-                          errors.incrementAndGet();
-                          System.err.println("bad result " + out);
-                      }
-                  } catch(Throwable t) {
-                      t.printStackTrace();
-                      errors.incrementAndGet();
-                  }
-              }
-          };
-          tr.run();
-          assertEquals(0, tr.errors.get());
-       
-          System.err.println(thr + " threads, " + (thr * cnt) + " total blocking URL requests: " + 
-                  (System.currentTimeMillis() - t0));
-
-          //assertEquals(testServer., actual)
-      } finally {
-          dumpHeap("heapURLReq.bin");
-      }
-  }
-  
-  // TODO: move to a servlet
-  private void dumpHeap(String file) throws InstanceNotFoundException,
-      MBeanException, ReflectionException, MalformedObjectNameException {
-
-      if (server == null) {
-          server = ManagementFactory.getPlatformMBeanServer();
-          
-      }
-      File f1 = new java.io.File(file);
-      if (f1.exists()) {
-          f1.delete();
-      }
-      server.invoke(new ObjectName("com.sun.management:type=HotSpotDiagnostic"),
-              "dumpHeap",
-              new Object[] {file, Boolean.FALSE /* live */}, 
-              new String[] {String.class.getName(), "boolean"});
-  }
-
-
-  RequestCompleted reqCallback = new RequestCompleted() {
-    @Override
-    public void handle(HttpChannel data, Object extraData) 
-        throws IOException {
-        String out = data.getIn().copyAll(null).toString();
-        if (200 != data.getResponse().getStatus()) {
-            System.err.println("Wrong status");
-            tr.errors.incrementAndGet();            
-        }
-        if (!"Hello world".equals(out)) {
-            tr.errors.incrementAndGet();
-            System.err.println("bad result " + out);
-        }        
-        synchronized (active) {
-            active.remove(data.getRequest());
-        }
-        data.release();
-        int okres = ok.incrementAndGet();
-        if (okres >= reqCnt) {
-            synchronized (lock) {
-                lock.notify();
+        };
+        tr.run();
+        assertEquals(0, tr.errors.get());
+        synchronized (lock) {
+            if (ok.get() < reqCnt) {
+                lock.wait(reqCnt * 100);
             }
         }
+        assertEquals(reqCnt, ok.get());
+        System.err.println(reqCnt + " Async requests: " + (System.currentTimeMillis() - t0));
     }
-  };
-  
-  
+
+    public void testURLRequest1000() throws Exception {
+        urlRequest(10, 100);
+    }
+
+    public void testURLRequest10000() throws Exception {
+        urlRequest(20, 500);
+
+    }
+
+    /** 
+     * HttpURLConnection client against lite.http server.
+     */
+    public void urlRequest(int thr, int cnt) throws Exception {
+        long t0 = System.currentTimeMillis();
+
+
+        try {
+            HttpConnector testServer = TestMain.getTestServer();
+
+            tr = new ThreadRunner(thr, cnt) {
+
+                public void makeRequest(int i) throws Exception {
+                    try {
+                        ByteChunk out = new ByteChunk();
+                        HttpURLConnection con = TestMain.getUrl("http://localhost:8802/hello", out);
+                        if (con.getResponseCode() != 200) {
+                            errors.incrementAndGet();
+                        }
+                        if (!"Hello world".equals(out.toString())) {
+                            errors.incrementAndGet();
+                            System.err.println("bad result " + out);
+                        }
+                    } catch(Throwable t) {
+                        t.printStackTrace();
+                        errors.incrementAndGet();
+                    }
+                }
+            };
+            tr.run();
+            assertEquals(0, tr.errors.get());
+
+            System.err.println(thr + " threads, " + (thr * cnt) + " total blocking URL requests: " + 
+                    (System.currentTimeMillis() - t0));
+
+            //assertEquals(testServer., actual)
+        } finally {
+            dumpHeap("heapURLReq.bin");
+        }
+    }
+
+    // TODO: move to a servlet
+    private void dumpHeap(String file) throws InstanceNotFoundException,
+    MBeanException, ReflectionException, MalformedObjectNameException {
+
+        if (server == null) {
+            server = ManagementFactory.getPlatformMBeanServer();
+
+        }
+        File f1 = new java.io.File(file);
+        if (f1.exists()) {
+            f1.delete();
+        }
+        server.invoke(new ObjectName("com.sun.management:type=HotSpotDiagnostic"),
+                "dumpHeap",
+                new Object[] {file, Boolean.FALSE /* live */}, 
+                new String[] {String.class.getName(), "boolean"});
+    }
+
+
+    RequestCompleted reqCallback = new RequestCompleted() {
+        @Override
+        public void handle(HttpChannel data, Object extraData) 
+        throws IOException {
+            String out = data.getIn().copyAll(null).toString();
+            if (200 != data.getResponse().getStatus()) {
+                System.err.println("Wrong status");
+                tr.errors.incrementAndGet();            
+            }
+            if (!"Hello world".equals(out)) {
+                tr.errors.incrementAndGet();
+                System.err.println("bad result " + out);
+            }        
+            synchronized (active) {
+                active.remove(data.getRequest());
+            }
+            data.release();
+            int okres = ok.incrementAndGet();
+            if (okres >= reqCnt) {
+                synchronized (lock) {
+                    lock.notify();
+                }
+            }
+        }
+    };
+
+
 }

Modified: tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/test/watchdog/WatchdogClient.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/test/watchdog/WatchdogClient.java?rev=898619&r1=898618&r2=898619&view=diff
==============================================================================
--- tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/test/watchdog/WatchdogClient.java (original)
+++ tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/test/watchdog/WatchdogClient.java Wed Jan 13 02:07:25 2010
@@ -33,7 +33,7 @@
 import org.w3c.dom.NodeList;
 import org.xml.sax.SAXException;
 
-public class WatchdogClient implements Test {
+public class WatchdogClient {
         
   protected String goldenDir; 
   protected String testMatch;
@@ -167,12 +167,10 @@
   protected String single;
   WatchdogTestCase singleTest;
   
-  @Override
   public int countTestCases() {
       return 1;
   }
 
-  @Override
   public void run(TestResult result) {
       getSuite();
       if (singleTest != null) {



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org