You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by co...@apache.org on 2008/06/14 17:28:07 UTC

svn commit: r667814 - in /tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client: ./ AsyncHttp.java AsyncHttpCallback.java AsyncHttpPool.java BlockingHttp.java SocketPool.java

Author: costin
Date: Sat Jun 14 08:28:06 2008
New Revision: 667814

URL: http://svn.apache.org/viewvc?rev=667814&view=rev
Log:
After many iterations ( and about 2 years )... This is the common code for my version of the nio connector, will have an apr version 
as well. It is completely non-blocking at core, with a separate blocking interface. ProxyAdapter is the main example, can be used 
to break many of the comet limitations.
Of course, it is less stable and doesn't have most features yet of the nio/apr connectors - ssl, many setters are missing, it's 
just an experiment.


Added:
    tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/
    tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/AsyncHttp.java   (with props)
    tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/AsyncHttpCallback.java   (with props)
    tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/AsyncHttpPool.java   (with props)
    tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/BlockingHttp.java   (with props)
    tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/SocketPool.java   (with props)

Added: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/AsyncHttp.java
URL: http://svn.apache.org/viewvc/tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/AsyncHttp.java?rev=667814&view=auto
==============================================================================
--- tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/AsyncHttp.java (added)
+++ tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/AsyncHttp.java Sat Jun 14 08:28:06 2008
@@ -0,0 +1,1485 @@
+/*  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.coyote.client;
+
+import java.io.IOException;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Logger;
+
+import org.apache.coyote.ActionCode;
+import org.apache.coyote.ActionHook;
+import org.apache.coyote.Request;
+import org.apache.coyote.Response;
+import org.apache.coyote.http11.Constants;
+import org.apache.tomcat.util.buf.ByteChunk;
+import org.apache.tomcat.util.buf.HexUtils;
+import org.apache.tomcat.util.buf.MessageBytes;
+import org.apache.tomcat.util.http.Http11Parser;
+import org.apache.tomcat.util.http.HttpMessages;
+import org.apache.tomcat.util.http.MimeHeaders;
+import org.apache.tomcat.util.modeler.Registry;
+import org.apache.tomcat.util.net.SelectorCallback;
+import org.apache.tomcat.util.net.SelectorPool;
+import org.apache.tomcat.util.net.SelectorThread;
+import org.apache.tomcat.util.net.SelectorThread.SelectorData;
+
+/**
+ * HTTP async client, using tomcat buffers and I/O.
+ * 
+ * Because so much is common between request and response, this can be 
+ * used in both directions - for initiating requests and as a server.
+ *
+ * 'Input', 'read', 'Recv' refers to information we get from the remote side - 
+ * the request body for server-mode or response body for client.
+ * 
+ * 'Output', 'write', 'Send' is for info we send - the post in client mode 
+ * and the response body for server mode.
+ * 
+ * TODO: maybe split in 2 - or use inheritance to override server-side ? 
+ * 
+ * @author Costin Manolache
+ */
+public class AsyncHttp {
+
+    private static final int HEADER_SIZE = 2048;
+
+    protected static final int NEED_MORE = -1;
+    static final int SUSPEND = -2;
+    static final int CLOSE = -3;
+    static final int ERROR = -4;
+    static final int DONE = -5;
+    static final int OK = 0;
+
+    static ByteChunk space = new ByteChunk(1);
+    static ByteChunk col = new ByteChunk(1);
+    static ByteChunk crlf = new ByteChunk(2);
+    static {
+        try {
+            space.append(' ');
+            col.append(':');
+            crlf.append('\r');
+            crlf.append('\n');
+        } catch (IOException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+    }
+
+    static byte[] END_CHUNK_BYTES = {
+        (byte) '\r', (byte) '\n', 
+        (byte) '0', 
+        (byte) '\r', (byte) '\n', 
+        (byte) '\r', (byte) '\n'};
+    
+    public enum State {
+        HEAD,
+        CHUNK_HEAD,
+        BODY_DATA,
+        DONE
+    }
+    static AtomicInteger serCnt = new AtomicInteger();
+    static Registry registry = Registry.getRegistry(null, null);
+
+    // ---- Buffers owned by the AsyncHttp object ----
+    protected Logger log = Logger.getLogger("AsyncHttp");
+    
+    // Used to receive an entity - headers + maybe some body
+    // read() must first consume extra data from this buffer.
+    // Next reads will be direct from socket.
+    protected ByteChunk headRecvBuf = new ByteChunk(HEADER_SIZE);
+    
+    // Used to serialize the outbound entity headers. Only head, no body
+    protected ByteChunk headSendBuf = new ByteChunk(HEADER_SIZE);
+
+    // ---------- Body read side ------------
+    protected State recvState = State.HEAD;
+    
+    protected boolean chunkedRecv = false;
+    protected boolean needsCRLFParseRecv = false;
+    protected long contentLengthRecv = -1; // C-L header
+    /** Bytes remaining to read in the current chunk or body ( if CL ) */
+    protected long remainingRecv = 0; // both chunked and C-L
+    
+    /** True: Http/1.x + chunked || C-L - 
+     *  False: for http/0.9, errors. -> will wait for close to detect end */    
+    protected boolean contentDelimitationRecv = false; 
+    
+    // Set if Exect: 100-continue was set on reqest.
+    // If this is the case - body won't be sent until
+    // server responds ( client ) and server will only
+    // read body after ack() - or skip to next request 
+    // without swallowing the body.
+    protected boolean expectation = false; 
+    
+    protected boolean closeOnEndRecv = false; 
+
+    // dataReceived should throw error;
+    protected boolean recvError = false;
+    
+    // ----------- Body write side ------------
+    protected State sendState;
+        
+    protected boolean chunkedSend = false;
+    protected boolean beforeFirstChunkSend = true;
+    
+    /** Remaining bytes - in the content or chunk  */
+    protected long remainingSend = 0;
+
+    /** Don't send any body - HEAD or 204..305 */
+    protected boolean noBodySend = false; 
+
+    /** 
+     * Close connection when done writting, no content-length/chunked, 
+     * or no keep-alive ( http/1.0 ) or error.
+     * 
+     * ServerMode: set if HTTP/0.9 &1.0 || !keep-alive
+     * ClientMode: not currently used
+     */
+    protected boolean closeOnEndSend = false;
+    
+    /** True when the output is closed, send done */
+    protected boolean sendDone = false;
+
+    /** True when service is done */
+    protected boolean serviceDone = false;
+    
+    /** Ready for recycle */
+    protected boolean allDone = false;
+    
+    // ---------- Buffers for body  -------------
+    
+    // One big buffer for raw data received. 
+    protected ByteChunk rawRecvBuf = new ByteChunk(5120);
+
+    // Pointer to some received data. This doesn't own the buffer 
+    // Call get again to reset to newer data, or wait to
+    // block until data is received
+    protected ByteChunk currentRecvBucket = new ByteChunk();
+
+    // Data to send - buffers are allocated by the calling app
+    // AsyncHttp 'owns' them while in transit, a callback will be called
+    // when each buffer is sent.
+    protected List<ByteChunk> sendBrigade = 
+        new LinkedList<ByteChunk>();
+    
+    // Buffer used for chunk length conversion.
+    protected byte[] sendChunkLength = new byte[10];
+    // Buffer to encode the chunked head
+    protected ByteChunk sendChunkBuffer = new ByteChunk();
+    
+    /** End chunk marker - will include chunked end or empty */
+    protected ByteChunk endSendBuffer = new ByteChunk();
+
+    // ---------------- Common ---------- 
+    protected BlockingHttp blockingHttp = new BlockingHttp(this);
+    
+    protected AsyncHttpCallback httpCallback = blockingHttp;
+    
+    protected SelectorData selectorData;
+    
+    protected Request req;
+    protected Response res;
+
+    protected Http11Parser parser = new Http11Parser();
+
+    protected SelectorThread selector;
+    protected SelectorPool client;
+    protected AsyncHttpSelectorCallback selectorCallback;
+    
+    /** To set when done, wait for next req */
+    protected SelectorCallback keepAliveCallback;
+
+    protected boolean debug = false; 
+
+    // ---------- Client only ------------
+    // Because Response uses String. They wrap data in headReadBuf
+    protected MessageBytes status = MessageBytes.newInstance();
+    protected MessageBytes proto = MessageBytes.newInstance();
+    protected MessageBytes statusMsg = MessageBytes.newInstance();
+
+    protected String dbgName = this.getClass().getSimpleName(); 
+    
+    // ----- Pools - sockets, heavy objects -------------
+    // If client mode - what host we are connected to.
+    protected String host;
+    protected int port;
+    protected AsyncHttpPool pool;
+    
+    // ------ JMX 
+    protected int ser; // id - for jmx registration and logs
+    
+    public void recycle() {
+        recycle(false);
+    }
+    
+    protected void recycle(boolean keepHead) {
+        if (!keepHead) {
+            headRecvBuf.recycle();
+            selectorData = null;
+        }
+        headSendBuf.recycle();
+        recvState = State.HEAD;
+        
+        
+        sendDone = false;
+        allDone = false;
+        serviceDone = false;
+        
+        blockingHttp.recycle();
+        
+        chunkedRecv = false;
+        needsCRLFParseRecv = false;
+        remainingRecv = 0; 
+        contentLengthRecv = -1;
+        contentDelimitationRecv = false;
+        expectation = false;
+        closeOnEndRecv = false;
+        recvError = false;
+        
+        sendState = State.HEAD;
+        chunkedSend = false;
+        beforeFirstChunkSend = true;
+        remainingSend = 0;
+        noBodySend = false; 
+        closeOnEndSend = false;
+        
+        
+        //protected LinkedList<ByteChunk> readBuffers = new LinkedList<ByteChunk>();
+        rawRecvBuf.recycle();
+        currentRecvBucket.recycle();
+//        protected ArrayList<ByteChunk> currentSendBuffers = 
+//            new ArrayList<ByteChunk>();
+        sendChunkBuffer.recycle();
+        endSendBuffer.recycle();
+        
+        // Don't change httpCallback;
+            
+        // don't reset req, res
+        parser.recycle();
+        
+        // don't change 
+        // selector;
+        //selectorCallback;
+        //keepAliveCallback;
+        
+        //client;
+        //serverSide = false;
+
+        status.recycle();
+        proto.recycle();
+        statusMsg.recycle();
+        
+        host = null;
+        //protected ClientPool pool;
+
+        // Default values in req.
+        req.recycle();
+        // defaults:
+        req.method().setString("GET");
+        req.protocol().setString("HTTP/1.1");
+        res.recycle();
+        if (debug) {
+            log.info(dbgName + ": recycle " + ser);
+        }
+    }
+
+    protected AsyncHttp(SelectorPool client, boolean init) {
+        if (client == null) {
+            client = SelectorPool.defaultPool();
+        }
+        selectorCallback = new AsyncHttpSelectorCallback(); 
+        ser = serCnt.incrementAndGet();
+        try {
+            registry.registerComponent(this, ":name=AsyncHttp-" + 
+                    ser, "AsyncHttp");
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+        this.selector = client.getSelector();
+
+        if (init) {
+            req = new Request();
+            //req.setInputBuffer(new RawInputBuffer(input));
+
+            res = new Response();
+            //res.setOutputBuffer(new ByteChunkOutputBuffer(output));
+            ActionHook clientHook = new ActionHook() {
+                public void action(ActionCode actionCode, Object param) {
+                    //System.err.println("Action: " + actionCode + " " + param);
+                }
+            };
+            res.setHook(clientHook);
+            req.setResponse(res);
+            recycle();
+        }
+        //initializeFilters();
+        
+        sendChunkLength[8] = (byte) '\r';
+        sendChunkLength[9] = (byte) '\n';
+        log = Logger.getLogger(this.getClass().getSimpleName());
+        
+        if (debug) {
+            log.info("Create new client ser=" + ser);
+        }
+    }
+
+    public AsyncHttp(SelectorPool client) {
+        this(client, true);
+    }
+
+    protected AsyncHttp() {
+    }
+
+    public void setClientPool(AsyncHttpPool pool) {
+        this.pool = pool;
+    }
+
+    public void setRequest(Request req, Response res) {
+        this.req = req;
+        this.res = res;
+    }
+    
+    public void setSelectorData(SelectorData selData) {
+        this.selectorData = selData;
+    }
+
+    // ----- Callback methods --------
+    
+    /**
+     * Read
+     * @return NEED_MORE - read all available data
+     * @return CLOSE - other side closed connection
+     * @return > 0 - number of bytes received
+     */
+    private int tryRead(SelectorData selT, ByteChunk buf)
+            throws IOException {
+        ByteBuffer bb = buf.getWriteByteBuffer();
+        int done = selT.sel.readNonBlocking(selT, bb);
+        buf.updateWriteByteBuffer();
+        
+        if (done < 0) {
+            // End of input - other side closed, no more data
+            if (debug) {
+                log.info(dbgName + ": close while reading  " + ser);    
+            }
+            return CLOSE;
+        }
+        if (done == 0) {
+            if (debug) {
+                log.info(dbgName + ": wait more data ser=" + ser + 
+                    " buf=" + buf.length() + " s=" + buf.getStart() +
+                    " e=" + buf.getEnd());
+                //new Throwable().printStackTrace();
+            }
+            return NEED_MORE;        
+        }
+        if (debug) {
+            log.info(dbgName + ": read " + ser + " len=" + done);
+        }
+        return done;
+    }
+
+    /** 
+     * Read and process a chunk of head 
+     * 
+     */
+    protected int headDataReceived(SelectorData selT) throws IOException {
+        while (recvState == State.HEAD) {
+            if (headRecvBuf.length() == 0) {
+                int done = tryRead(selT, headRecvBuf);
+                if (done == CLOSE) {
+                    // TODO: http/0.9 ? 
+                    selT.sel.close(selT); // too early - we don't have the head
+                    return done;
+                } else if (done == NEED_MORE) {
+                    // End of input - other side closed, no more data
+                    //System.err.print("R1-" + ser + " ");    
+                    return done;
+                }
+                if (debug ) {
+                    log.info(dbgName + ": headDataReceived: ser=" + ser + " rd=" + 
+                            done + " head=" + headRecvBuf.length() + 
+                            " body=" + rawRecvBuf.length());
+                }
+            } else {
+                if (debug ) {
+                    log.info(dbgName + ": headDataReceived, existing data: " + 
+                            ser + " " +  " head=" + headRecvBuf.length());
+                }
+            }
+
+            //Parse the response
+            parser.setBuffer(headRecvBuf.getBuffer(), headRecvBuf.getStart(), 
+                    headRecvBuf.getEnd());
+            int dataStart = parseHead();
+            if (dataStart == NEED_MORE) {
+                return NEED_MORE;
+            }
+            // Update the buffer - should contain only the payload
+            headRecvBuf.setOffset(dataStart);
+            if (headRecvBuf.length() > 0) {
+                rawRecvBuf.append(headRecvBuf); // remaining
+                if (debug ) {
+                    log.info(dbgName + ": headReceived, extra data : " + ser + " " + 
+                            " buf=" + rawRecvBuf.length() + "\n" + headRecvBuf);
+                }
+            } else if (rawRecvBuf.length() > 0) {
+                log.warning("XX Unexpected remaining data \n" + rawRecvBuf);
+            }
+            
+            // Now leave the headRecvBuf to point to the full head,
+            // for debug or forwarding
+            headRecvBuf.setOffset(0);
+            headRecvBuf.setEnd(dataStart);
+            
+            headersReceived();
+            return DONE;
+        }
+        // TODO: if GET/HEAD - request received..
+        return NEED_MORE;
+    }
+
+    /**
+     * Process the head, overriden for server-side.
+     * @return NEED_MORE or number of extra body bytes in the head buffer 
+     */
+    protected int parseHead() throws IOException {
+        boolean ok = parser.parseResponseLine(proto, status, statusMsg);
+        if (!ok) {
+            return NEED_MORE;
+        }
+
+        int dataStart = parser.parseHeaders(res.getMimeHeaders());
+        return (dataStart < 0) ? NEED_MORE : dataStart;
+    }
+
+    public boolean isReadContentDelimited() {
+        return chunkedRecv || contentLengthRecv >= 0;
+    }
+    
+    protected void headersReceived() throws IOException {
+        // We have the response head, now flush remaining data.
+        if (res != null) {
+            try {
+                res.setStatus(status.getInt());
+                res.setMessage(statusMsg.toString());
+            } catch (Throwable t) {
+                log.warning("Invalid status " + status + " " + statusMsg);
+            }
+        }
+        // Extract transfer data
+        processReadTransferHeaders(res.getMimeHeaders());
+        if (contentLengthRecv >= 0) {
+            // Update response - it's a cached value
+            res.setContentLength(contentLengthRecv);
+        }
+
+        if (!isReadContentDelimited()) {
+            closeOnEndRecv = true;
+            closeOnEndSend = true;
+        }
+        //Unlock head notification
+        if (httpCallback != null) {
+            httpCallback.headersReceived(this, req, res);
+        }
+    }  
+    
+    /*
+     * Called when sending, receiving and processing is done.
+     * Can be called:
+     *  - from IO thread, if this is a result of a read/write event that 
+     *  finished the send/recev pair.
+     *  - from an arbitrary thread, if read was complete and the last write
+     *  was a success and done in that thread ( write is not bound to IO thr)
+     * 
+     */
+    protected void handleEndSendReceive(SelectorData selT) throws IOException {
+        if (httpCallback != null) {
+            httpCallback.done(this, false, null);
+        }
+        if (closeOnEndSend) {
+            selT.sel.close(selT);
+        }
+        if (debug) {
+            log.info("client send/receive done, close=" + 
+                closeOnEndSend);
+        }
+        maybeRelease();
+    }
+    
+    /** 
+     * called from IO thread OR servlet thread when last block has been sent.
+     * 
+     * @param selT 
+     * @throws IOException 
+     */
+    protected void handleEndSent(SelectorData selT) throws IOException {
+        if (debug) {
+            log.info("OUT body sent done " + ser + " " + sendState);
+        }
+        if (sendState == State.DONE) {
+            // Only once.
+            log.severe("Duplicate END SEND");
+            return;
+        }
+        // Send final marker for chunked encoding
+        
+        // TODO: close out stream ? 
+        if (closeOnEndSend) {
+            selector.close(selT);
+        }
+        
+        if (httpCallback != null) {
+            httpCallback.bodySent(this, false, null);
+        }
+        // Make sure the send/receive callback is called once
+        synchronized(this) {
+            sendState = State.DONE;
+            if (recvState == State.DONE) {
+                handleEndSendReceive(selT);
+            }
+        }
+    }
+
+    /**
+     * Called from IO thread, after the request body 
+     * is completed ( or if there is no req body )
+     * @throws IOException 
+     */
+    protected void handleEndReceive(boolean frameError) throws IOException {
+        if (debug) {
+            log.info("handleEndReceive(): IN body recv done " + ser + 
+                    " " + frameError + " recv=" + recvState);
+        }
+        if (frameError) {
+            closeOnEndSend = true;
+            // TODO: next read() should throw exception !!
+            recvError = true;
+        }
+
+        if (httpCallback != null) {
+            httpCallback.bodyReceived(this, null, res, false, null);
+        }
+
+        synchronized(this) {
+            recvState = State.DONE;
+            if (sendState == State.DONE) {
+                handleEndSendReceive(selectorData);
+            }
+        }
+    }
+
+    /** 
+     * Called when raw body data is received.
+     * Callback should not consume past the end of the body.
+     *  
+     */
+    public int rawDataReceived(SelectorData selT) throws IOException {
+        int rc = NEED_MORE; // keep receiving
+        if (rawRecvBuf.length() == 0) {
+            // Only called from headers, if GET/HEAD
+            handleEndReceive(false);
+            return DONE;
+        }
+        while (recvState == State.BODY_DATA) {
+            if (chunkedRecv && remainingRecv == 0) {
+                if (needsCRLFParseRecv) {
+                    // TODO: Trailing headers
+                    rc = parseCRLF(rawRecvBuf);
+                    if (rc == NEED_MORE) {
+                        return rc; // continue receiving
+                    }
+                    needsCRLFParseRecv = false;
+                    if (rawRecvBuf.length() == 0) {
+                        return NEED_MORE;
+                    }
+                }
+                rc = parseChunkHeader(rawRecvBuf);
+                if (rc == NEED_MORE) {
+                    return rc; // continue receiving
+                } else if (rc == ERROR) {
+                    handleEndReceive(true);
+                    return rc;
+                } else if (rc == 0) {
+                    // last chunk, we just parsed it
+                    handleEndReceive(false);
+                    return DONE;
+                } else {
+                    remainingRecv = rc;
+                }
+            } else if (contentLengthRecv >= 0 && remainingRecv == 0) {
+                // Content-length case, we're done reading
+                handleEndReceive(false);
+                return DONE;
+            }
+            
+            // Remaining data in buffer, after parsing the header
+            if (rawRecvBuf.length() == 0) {
+                return NEED_MORE;
+            } else {
+                int lenToConsume = (int) remainingRecv;
+                if (remainingRecv > rawRecvBuf.length() ||
+                        (!chunkedRecv && contentLengthRecv < 0)) { // close to end
+                    lenToConsume = rawRecvBuf.length();
+                }
+                currentRecvBucket.setBytes(rawRecvBuf.getBuffer(), 
+                        rawRecvBuf.getOffset(), lenToConsume);
+                
+                // Consume the data from the rawReadBuffer, it is now 
+                // in currentRecvBuffer
+                // This is not an empty buffer - i.e. not replaced
+                rawRecvBuf.setOffset(rawRecvBuf.getOffset() + lenToConsume);
+
+                if (httpCallback != null) {
+                        httpCallback.dataReceived(this, res, 
+                                currentRecvBucket,
+                                rawRecvBuf,
+                                null);
+                    // at this point rawReadBuf may be replaced.
+                }
+
+
+                // needs to consume all or replace it:
+                // - set rawReadBuf to a new buffer if it didn't consume all
+                // We could continue appending - but that's a bit more complex
+                // and may use more memory in total - with replacing the 
+                // buffer we can use less.
+                if (!chunkedRecv && contentLengthRecv < 0) {
+                    // continue receiving until close. We processed the buffer. 
+                    return NEED_MORE;
+                } else if (remainingRecv == lenToConsume) {
+                    // Leave unconsumed data in buf
+                    remainingRecv = 0;
+                    if (chunkedRecv) {
+                        needsCRLFParseRecv = true;
+                    } else { // C-L
+                        handleEndReceive(false);
+                        if (debug) {
+                            log.info("RAW: AFTER END RECEIVE " + this);
+                        }
+                        return DONE;
+                    }
+                    continue; // will parse chunk head or deal with end 
+                } else {
+                    // Consumed all data in rawReadBuf, need more
+                    remainingRecv -= lenToConsume;          
+                    return NEED_MORE;
+                }
+            }
+        }
+        
+        if (recvState == State.DONE) {
+            if (rawRecvBuf.length() > 0) {
+                // I don't want more. 
+                // read interest is only suspended if next req is sent
+                // while processing the current one - don't want to 
+                // receive the whole post body, but keep what was sent.
+                // TODO: selT.readInterest(this, false);
+                // We read too much, leave it there for next request.
+                handleReceivePastEnd(selT);
+            }
+            // we must read the current buffer, or select() will be sad
+        }
+        return DONE;
+        
+//        log.info("Receive done " + req.requestURI() +  " " + 
+//                rawReadBuf.length());
+        
+        
+//        else if (rc == DONE){
+//            if (debug ) {
+//                log.info(dbgName + ": body recv done: " + ser);
+//            }
+//            recvState = State.DONE;
+//            boolean close = bodyReceived();
+//            // done reading - will be re-enabled when keep-alive is set
+//            //selT.readInterest(this, false);
+//            return;
+//        } else {
+//            log.warning("Untested read suspend");
+//            //selT.readInterest(this, false);
+                
+    }
+
+
+    protected void handleReceivePastEnd(SelectorData selT) throws IOException {
+        log.info(this.toString() + " CLIENT Read past end\n" + rawRecvBuf + " " 
+                + rawRecvBuf.length());
+        closeOnEndRecv = true;
+        closeOnEndSend = true;
+        // Force close
+        selector.close(selT);
+    }
+
+    protected int processReadTransferHeaders(MimeHeaders headers) {
+        MessageBytes transferEncodingValueMB =
+            headers.getValue("transfer-encoding");
+
+        if (transferEncodingValueMB != null) {
+            String transferEncodingValue = transferEncodingValueMB.toString();
+            // Parse the comma separated list. "identity" codings are ignored
+            int startPos = 0;
+            int commaPos = transferEncodingValue.indexOf(',');
+            String encodingName = null;
+            while (commaPos != -1) {
+                encodingName = transferEncodingValue.substring
+                (startPos, commaPos).toLowerCase().trim();
+                if ("chunked".equalsIgnoreCase(encodingName)) {
+                    chunkedRecv = true;
+                } else {
+                    return 501; // Currently only chunked is supported for 
+                    // transfer encoding.
+                }
+                startPos = commaPos + 1;
+                commaPos = transferEncodingValue.indexOf(',', startPos);
+            }
+            encodingName = transferEncodingValue.substring(startPos)
+            .toLowerCase().trim();
+            if ("chunked".equals(encodingName)) {
+                chunkedRecv = true;
+                contentDelimitationRecv = true;
+            } else {
+                return 501; // Currently only chunked is supported for 
+                // transfer encoding.
+            }
+        }
+        // Parse content-length header
+        MessageBytes clB = headers.getUniqueValue("content-length");
+        contentLengthRecv = (clB == null || clB.isNull()) ? -1 : clB.getLong();
+        if (contentLengthRecv >= 0) {
+            remainingRecv = contentLengthRecv;
+            contentDelimitationRecv = true;
+            if (chunkedRecv) {
+                return 501; // both c-l and chunked
+            }
+        }
+
+        // Content encoding - set it on the buffer, will be processed in blocking
+        // mode, after transfer encoding.
+        MessageBytes contentEncodingValueMB =
+            headers.getValue("content-encoding");
+
+//        if (contentEncodingValueMB != null) {
+//            if (contentEncodingValueMB.equals("gzip")) {
+//                buffer.addActiveFilter(gzipIF);
+//            }
+//            // TODO: other encoding filters
+//            // TODO: this should be separate layer
+//        }    
+
+        // TODO:
+//        if (!contentDelimitation) {
+//            // If there's no content length 
+//            // (broken HTTP/1.0 or HTTP/1.1), assume
+//            // the client is not broken and didn't send a body
+//            appReadBuf.addActiveFilter
+//                    (inputFilters.get(Constants.VOID_FILTER));
+//            contentDelimitation = true;
+//        }
+        
+        // TODO(1): Connection:close handling
+
+        return 0;
+    }
+
+    // ---------------- Writting ------------------------------- 
+    
+    /** 
+     * Start write.
+     * 
+     * Unlike OutputBuffer in coyote, the byte chunk will be consumed, 
+     * to allow easier handling of non-blocking. After startWrite is called
+     * the caller should no longer modify the buffer until 
+     * callback chunkWritten() is called.  
+     * 
+     * @param bc ByteChunk - will be consumed, like ByteChannel.write
+     * @return
+     * @throws IOException
+     */
+    public boolean startWrite(ByteChunk bc) 
+            throws IOException {
+        if (sendDone) {
+            throw new IOException("Output closed");
+        }
+        // Either us or the selector thread sending ( in handleWrite ).
+        synchronized (sendBrigade) {
+            sendBrigade.add(bc);
+
+            // Attempt to write - now, maybe we don't need to worry about 
+            // interest
+            sendData(selectorData);
+            if (sendBrigade.size() == 0) {
+                if (debug) {
+                    log.info("Sent all data in-thread");
+                }
+                return true;
+            }
+        }
+        if (debug) {
+            log.info(dbgName + ": added write buffer for later"); 
+        }
+        return false; // need to wait
+    }
+
+    /** 
+     * Called from selector thread when 'write' is possible, or from regular
+     * threads when buffers can be written.
+     * 
+     * This is synchronized on 'currentSendBuffers'.
+     * 
+     * This is the only place where WRITE interest can be added or removed.
+     * It also manages the 'writeInterest' queue if running in the network 
+     * thread. Again: no other code in the selector thread or outside should
+     * register or clear write interst except this code. 
+     */
+    protected int sendData(SelectorData selT) throws IOException {
+        int written = 0;
+        synchronized (sendBrigade) {
+            while (sendBrigade.size() > 0) {
+                // TODO: only send < remainingWrite, if buffer
+                // keeps changing after startWrite() is called (shouldn't)
+
+                ByteChunk current = sendBrigade.get(0);
+                // Chunked ? Do we need a size header ? 
+                if (chunkedSend  
+                        && current != headSendBuf 
+                        && current != endSendBuffer
+                        && remainingSend == 0
+                        && !noBodySend) {
+                    prepareChunkHeader(); // will insert a new buffer
+                    current = sendBrigade.get(0); 
+                }
+                
+                // TODO: use ByteBuffer[]
+                int done = 0;
+                if (current.getLength() > 0) { // including end marker
+                    if (noBodySend) { // HEAD - pretend to send
+                        done = current.length();
+                        current.setOffset(current.getEnd());
+                    } else {
+                        ByteBuffer bb = current.getReadByteBuffer();
+                        done = selT.sel.writeNonBlocking(selT, 
+                                bb);
+                        current.updateReadByteBuffer();
+                    }
+                }
+                if (done < 0) { // error
+                    handleError("writeRequest");
+                    selT.sel.close(selT);
+                    return CLOSE;
+                }
+                
+                written += done;
+                
+                if (current != headSendBuf && 
+                        current != sendChunkBuffer && 
+                        current != endSendBuffer && 
+                        remainingSend > 0) {
+                
+                    remainingSend -= done;
+                    if (!chunkedSend && !closeOnEndSend &&
+                            remainingSend < 0) {
+                        // TODO: make sure we only write max.
+                        log.severe(dbgName + ": write more than Content-Length");
+                        selT.sel.close(selT);
+                        return CLOSE;
+                    }
+                }
+                
+                if (current.getLength() > 0) { // didn't finish
+                    if (debug) {
+                        log.info(dbgName + ": waiting for write buffer"); 
+                    }
+                    // 2 cases:
+                    //  - in sel thread: change at once.
+                    //  - in regular thread - writeInterest updated, on next
+                    // cycle dataWritable() will be called. 
+                    selT.sel.writeInterest(selT, true);
+                    return NEED_MORE;
+                } else {
+                    // DONE !!!
+                    if (current == headSendBuf) {
+                        // done sending req.
+                        headersSent();
+                    }
+                    
+                    // Next buffer 
+                    sendBrigade.remove(0);
+                    if (current == endSendBuffer) {
+                        handleEndSent(selT);
+                        sendDone = true;
+                        return DONE;
+                    }
+                    if (httpCallback != null) {
+                        httpCallback.dataWritten(current);
+                    }
+
+                    // size > 0 -> continue
+//                    if (outBrigade.size() > 0) {
+//                        log.info("More to write " + outBrigade.size());
+//                    }
+                }
+            }
+            // size == 0
+            // Only selector thread can remove interest
+            if (selT.sel.isSelectorThread()) {
+                selT.sel.writeInterest(selT, false);
+            }
+            return SUSPEND;
+        }
+        
+    }
+    
+    protected void headersSent() throws IOException {
+      sendState = State.BODY_DATA;      
+    }
+    
+ 
+
+    /** 
+     * Request to finish sending - final chunk marker.
+     * 
+     * TODO: detect Content-Length violations, close
+     *  
+     * Called explicitely by servlet or at the end of request.
+     */
+    public boolean endSend(AsyncHttp cb) throws IOException {
+        // Finished sending the last part of the response
+        synchronized (this) {
+            if (sendDone) {
+                // don't do this more than once
+                return false;
+            }
+
+            if (debug) {
+                log.info(dbgName + ": end sending c=" + chunkedSend
+                        + " ser=" + ser);
+            }
+            if (chunkedSend) {
+                if (beforeFirstChunkSend) {
+                    endSendBuffer.setBytes(END_CHUNK_BYTES, 2, 
+                            END_CHUNK_BYTES.length - 2);
+                } else {
+                    endSendBuffer.setBytes(END_CHUNK_BYTES, 0, 
+                            END_CHUNK_BYTES.length);                
+                }
+            } else {
+                endSendBuffer.reset();
+            }
+            startWrite(endSendBuffer);
+        } 
+        return sendBrigade.size() == 0;
+    }
+    
+    private void prepareChunkHeader() {
+        for (int i = 0; i < sendBrigade.size(); i++) {
+            ByteChunk bc = sendBrigade.get(i);
+            if (bc != endSendBuffer) {
+                remainingSend += bc.length();
+            }
+        }
+        // TODO: send chunk head.
+        int pos = 7;
+        int current = (int) remainingSend;
+        while (current > 0) {
+            int digit = current % 16;
+            current = current / 16;
+            sendChunkLength[pos--] = HexUtils.HEX[digit];
+        }
+        if (!beforeFirstChunkSend) {
+            sendChunkLength[pos--] = (byte) '\n';
+            sendChunkLength[pos--] = (byte) '\r';
+        } else {
+            beforeFirstChunkSend = false;
+        }
+        
+        sendChunkBuffer.setBytes(sendChunkLength, pos + 1, 9 - pos);
+        
+        sendBrigade.add(0, sendChunkBuffer);
+    }
+
+    /**
+     * Parse the header of a chunk.
+     * A chunk header can look like 
+     * A10CRLF
+     * F23;chunk-extension to be ignoredCRLF
+     * The letters before CRLF but after the trailer mark, must be valid hex digits, 
+     * we should not parse F23IAMGONNAMESSTHISUP34CRLF as a valid header
+     * according to spec
+     */
+    protected int parseChunkHeader(ByteChunk bc)
+            throws IOException {
+
+        int result = 0;
+        boolean eol = false;
+        boolean readDigit = false;
+        boolean trailer = false;
+        byte[] buf = bc.getBuffer();
+        int pos = bc.getStart();
+        int lastValid = bc.getEnd();
+
+        while (!eol) {
+            if (pos >= lastValid) {
+                return NEED_MORE;
+            }
+
+            if (buf[pos] == Constants.CR) {
+            } else if (buf[pos] == Constants.LF) {
+                eol = true;
+            } else if (buf[pos] == Constants.SEMI_COLON) {
+                trailer = true;
+            } else if (!trailer) { 
+                //don't read data after the trailer
+                if (HexUtils.DEC[buf[pos]] != -1) {
+                    readDigit = true;
+                    result *= 16;
+                    result += HexUtils.DEC[buf[pos]];
+                } else {
+                    //we shouldn't allow invalid, non hex characters
+                    //in the chunked header
+                    log.info("Chunk parsing error1 " + buf[pos] + " " + bc);
+                    return ERROR;
+                }
+            }
+            pos++;
+        }
+        if (!readDigit) {
+            log.info("Chunk parsing error2 " + bc);
+            return ERROR;
+        }
+
+        // Consume the len
+        bc.setOffset(pos);
+        return result;
+    }
+
+    /**
+     * Parse CRLF at end of chunk.
+     */
+    protected int parseCRLF(ByteChunk bc)
+    throws IOException {
+
+        byte[] buf = bc.getBuffer();
+        int pos = bc.getStart();
+        int lastValid = bc.getEnd();
+        boolean eol = false;
+
+        while (!eol) {
+            if (pos >= lastValid) {
+                return NEED_MORE;
+            }
+            if (buf[pos] == Constants.CR) {
+            } else if (buf[pos] == Constants.LF) {
+                eol = true;
+            } else {
+                throw new IOException("Invalid CRLF");
+            }
+            pos++;
+        }
+        bc.setOffset(pos);
+        return 0;
+    }
+
+    // ----- End Selector thread callbacks ----
+    public void open(String url) throws IOException {
+        open(new URL(url));
+    }
+
+    public void open(URL url) throws IOException {
+        String host = url.getHost();
+        int port = url.getPort();
+        String path = url.getFile(); // path + qry
+        req.requestURI().setString(path);
+        connect(host, port);
+    }
+
+    /**
+     * Determine if we must drop the connection because of the HTTP status
+     * code.  Use the same list of codes as Apache/httpd.
+     */
+    protected boolean statusDropsConnection(int status) {
+        return status == 400 /* SC_BAD_REQUEST */ ||
+        status == 408 /* SC_REQUEST_TIMEOUT */ ||
+        status == 411 /* SC_LENGTH_REQUIRED */ ||
+        status == 413 /* SC_REQUEST_ENTITY_TOO_LARGE */ ||
+        status == 414 /* SC_REQUEST_URI_TOO_LARGE */ ||
+        status == 500 /* SC_INTERNAL_SERVER_ERROR */ ||
+        status == 503 /* SC_SERVICE_UNAVAILABLE */ ||
+        status == 501 /* SC_NOT_IMPLEMENTED */;
+    }
+
+    /** 
+     * Convert the request to bytes, ready to send.
+     */
+    public static void serializeRequest(Request req, 
+                                        ByteChunk reqBuf) throws IOException {
+        req.method().toBytes();
+        if (!req.unparsedURI().isNull()) {
+            req.unparsedURI().toBytes();
+        }
+        req.protocol().toBytes();
+
+        reqBuf.append(req.method().getByteChunk());
+        reqBuf.append(space);
+        if (req.unparsedURI().isNull()) {
+            req.requestURI().toBytes();
+
+            reqBuf.append(req.requestURI().getByteChunk());      
+        } else {
+            reqBuf.append(req.unparsedURI().getByteChunk());
+        }
+        reqBuf.append(space);
+        reqBuf.append(req.protocol().getByteChunk());
+        reqBuf.append(crlf);
+        // Headers
+        MimeHeaders mimeHeaders = req.getMimeHeaders();
+        boolean hasHost = false;
+        for (int i = 0; i < mimeHeaders.size(); i++) {
+            MessageBytes name = mimeHeaders.getName(i);
+            name.toBytes();
+            reqBuf.append(name.getByteChunk());
+            if (name.equalsIgnoreCase("host")) {
+                hasHost = true;
+            }
+            reqBuf.append(col);
+            mimeHeaders.getValue(i).toBytes();
+            reqBuf.append(mimeHeaders.getValue(i).getByteChunk());
+            reqBuf.append(crlf);
+        }
+        if (!hasHost) {
+            reqBuf.append("Host: localhost\r\n".getBytes(), 0, 17);
+        }
+        reqBuf.append(crlf);
+    }
+
+    /** 
+     * Convert the response to bytes, ready to send.
+     */
+    public static void serializeResponse(Response res, 
+                                         ByteChunk reqBuf) throws IOException {
+        reqBuf.append(res.getRequest().protocol()).append(' ');
+        String status = Integer.toString(res.getStatus());   
+        reqBuf.append(status);
+        if (res.getMessage() != null) {
+            reqBuf.append(' ').append(res.getMessage());
+        } else {
+            reqBuf.append(' ').append(HttpMessages.getMessage(res.getStatus()));
+        }
+        reqBuf.append(crlf);
+        // Headers
+        MimeHeaders mimeHeaders = res.getMimeHeaders();
+        long cl = -1;
+        boolean hasChunked = false;
+        for (int i = 0; i < mimeHeaders.size(); i++) {
+            MessageBytes name = mimeHeaders.getName(i);
+            name.toBytes();
+            reqBuf.append(name.getByteChunk());
+            if (name.equalsIgnoreCase("content-length")) {
+                cl = mimeHeaders.getValue(i).getLong();
+            }
+            reqBuf.append(col);
+            reqBuf.append(' ');
+            mimeHeaders.getValue(i).toBytes();
+            reqBuf.append(mimeHeaders.getValue(i).getByteChunk());
+            reqBuf.append(crlf);
+        }
+        // TODO: add the rest of the magic from existing connectors
+        reqBuf.append(crlf);
+    }
+
+    public void handleError(String type) {
+        System.err.println("Error " + type + " " + sendState);
+    }
+
+    public Request getRequest() {
+        return req;
+    }
+    
+    public Response getResponse() {
+        return res;
+    }
+
+    public void setCallback(AsyncHttpCallback c) {
+        this.httpCallback = c;
+    }
+
+    public String toString() {
+        StringBuffer sb = new StringBuffer();
+        sb.append(dbgName + "-").append(ser).append(",rs=").append(getState())
+        .append(")");
+        return sb.toString();
+    }
+
+
+    // ----------------- Pool management -------------
+
+    /** 
+     * Can be called from any thread.
+     * 
+     * @param host
+     * @param port
+     * @throws IOException
+     */
+    public void connect(String host, int port) throws IOException {
+        this.host = host;
+        this.port = port;
+        serializeRequest(req, headSendBuf);    
+        sendState = State.HEAD;
+        if (rawRecvBuf.length() > 0) {
+            log.severe("pre-existing data");
+        }
+        // Get an existing channel, set it in 'write' mode.
+        // TODO: only need 'WRITE' if we can't write.
+        String target = host + ":" + port;
+        SelectorData sdata = (pool == null) ? null :
+            pool.getSocketPool().getChannel(target, 
+                selectorCallback);
+        if (sdata == null) {
+            if (debug) {
+                log.info("HTTP_CONNECT: New connection " + target + " " + this);
+            }
+            selector.connect(host, port, selectorCallback);
+        } else {
+            setSelectorData(sdata);
+            if (debug) {
+                log.info("HTTP_CONNECT: Reuse connection " + target + " " + this);
+            }
+            sendHead();
+        }
+    }
+    
+    void sendHead() throws IOException {
+        startWrite(headSendBuf);
+        // TODO: for post ( or body ) - don't send end, let body to be sent
+        endSend(this);
+    }
+    
+    public void release() throws IOException {
+        serviceDone = true;
+        maybeRelease();
+    }
+    
+    public void ioThreadRun(SelectorData selThread) throws IOException {
+        // Currently used only to set keep alive.
+        // If we need more, need to use state or pass a param.
+        returnToPool();
+    }
+    
+    /**
+     * Called when all done:
+     *  - service finished ( endService was called )
+     *  - output written
+     *  - input read
+     * @throws IOException 
+     */
+    protected void maybeRelease() throws IOException {
+        synchronized (this) {
+            if (serviceDone && sendDone) {
+                if (debug) {
+                    log.info("RELEASE ser=" + ser + " serviceDone=" + serviceDone +  " sendDone=" + sendDone);
+                }
+                if (!allDone) {
+                    allDone = true;
+                    // pipeline or keepalive
+                    selector.ioThreadRun(selectorData);
+                }
+            } else {
+                if (debug) {
+                    log.info("Attempt RELEASE " + ser + " " + serviceDone +  " " + sendDone);
+                }
+            }
+        }
+    }
+    
+    /** 
+     * Return to the pool. Must be called _after_ all IO is done
+     * @throws IOException 
+     */
+    protected void returnToPool() throws IOException {
+        if (!selector.isSelectorThread()) {
+            log.severe("Return to pool must be in sel thread");
+        }
+        String target = getTarget();
+       
+        SelectorData sdata = selectorData;
+        recycle();
+        
+        if (pool != null) {
+            if (target != null) {
+                pool.getSocketPool().returnChannel(target, sdata,
+                        selectorCallback);
+            }
+            pool.add(this);
+            if (debug) {
+                log.info("RETURN TO POOL ser=" + ser + " " + this);
+                // new Throwable().printStackTrace();
+            }
+        } 
+    }
+    
+    /**
+     * Read all the remaining data, throw it away.
+     */ 
+    public void readRemaining() {
+    }
+
+
+    public String getStatus() {
+        return status + " " + statusMsg;
+    }
+
+    public String getState() {
+        return recvState.toString() + "/" + sendState.toString() 
+         + "/chunked=" + chunkedRecv  
+         + "/close=" + closeOnEndRecv;
+    }
+    
+    public void setRawReadBuffer(ByteChunk b) throws IOException {
+        if (rawRecvBuf.length() > 0) {
+            // copy remaining bytes
+            b.append(rawRecvBuf);
+        }
+        this.rawRecvBuf = b;
+    }
+
+    public long getRemaining() {
+        return remainingRecv;
+    }
+
+    public String getTarget() {
+        if (host == null) {
+            return null;
+        }
+        return host + ":" + port;
+    }
+
+    public class AsyncHttpSelectorCallback extends SelectorCallback { 
+
+        @Override
+        public void dataReceived(SelectorData selT) throws IOException {
+            // Make sure it's correct. 
+            // TODO: set it in 'fromKeepAlive'
+            AsyncHttp.this.selectorData = selT;
+            
+            if (recvState == State.HEAD) {
+                // Will read at least 1 byte to detect close
+                int res = headDataReceived(selT);
+                if (res == CLOSE) {
+                    selT.sel.close(selT);
+                    return;
+                } else if (res == NEED_MORE) { 
+                    // TODO: what happens if head is too large ( > headBuf ) ? 
+                    //selT.readInterest(this, true);
+                    return;
+                }
+                if (recvState != State.DONE) {
+                    recvState = State.BODY_DATA;
+                }
+                if (debug) {
+                    log.info(dbgName + ": Head done, start body " + ser + 
+                            " buf=" + rawRecvBuf.length() + " r=" + remainingRecv + " c=" + 
+                            chunkedRecv + " rs=" + recvState);
+                }
+            } 
+
+            // TODO: Make sure we don't process more than we need ( eat next req ).
+            // If we read too much: leave it in readBuf, the finalzation code
+            // should skip KeepAlive and start processing it.
+            
+            // we need to read at least something - to detect -1 ( we could 
+            // suspend right away, but seems safer.
+            int rc = CLOSE;
+                        
+            while (true) {
+                // May have a chunk from head recv 
+                if (rawRecvBuf.length() > 0) {
+                    // we have something in the buffer ? Try to consume.
+                    // Or at least make room for more.
+                    int state = rawDataReceived(selT);
+                    // At this point we may have finished the req.
+                    if (state == DONE) {
+                        if (debug) {
+                            log.info("DONE IN RAW " + this);
+                        }
+                        // TODO: suspend read if more bytes are available
+                        
+                    }
+                }
+
+                // it should have been consumed or set to a new 
+                // buffer.
+                rc = tryRead(selT, rawRecvBuf);
+                if (rc == CLOSE) {
+                    closeOnEndSend = true;
+                    recvState = State.DONE;
+                    // Sender closed the recv stream - but he might have kept the 
+                    // send stream open. Example: http1.0 post without keep-alive
+                    selT.sel.readInterest(selT, false);
+                    return;
+                } 
+                if (rc == NEED_MORE) {
+                    return; // 0 bytes read
+                }
+            }
+
+            
+        }
+
+        @Override
+        public void dataWriteable(SelectorData selT) 
+                throws IOException {
+            if (!selT.sel.isSelectorThread()) {
+                throw new IOException("Not ST");
+            }
+            sendData(selT);
+            return;
+        }
+
+        @Override
+        public void channelClosed(SelectorData selThread, Throwable ex) {
+            if (ex != null) {
+                System.err.println("Closed due to error: ");
+                ex.printStackTrace();
+            }
+        }
+
+        @Override
+        public void connected(SelectorData selThread) 
+                throws IOException {
+            AsyncHttp.this.setSelectorData(selThread);
+            sendHead();            
+        }
+        
+        @Override
+        public void ioThreadRun(SelectorData selThread) throws IOException {
+            AsyncHttp.this.ioThreadRun(selThread);
+        }
+        
+        public String toString() {
+            return AsyncHttp.this.toString();
+        }
+    }
+
+    public BlockingHttp getBlockingHttp() {
+        return blockingHttp;
+    }
+    
+}
\ No newline at end of file

Propchange: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/AsyncHttp.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/AsyncHttpCallback.java
URL: http://svn.apache.org/viewvc/tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/AsyncHttpCallback.java?rev=667814&view=auto
==============================================================================
--- tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/AsyncHttpCallback.java (added)
+++ tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/AsyncHttpCallback.java Sat Jun 14 08:28:06 2008
@@ -0,0 +1,126 @@
+/*  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.coyote.client;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.coyote.Request;
+import org.apache.coyote.Response;
+import org.apache.tomcat.util.buf.ByteChunk;
+
+/**
+ * HttpProcessor can be used in blocking mode, using waitResponse()-like 
+ * methods and blocking reader, or it can be used in non-blocking mode, where
+ * each event is handled async ( or both ) 
+ * 
+ */
+public abstract class AsyncHttpCallback {
+    /** 
+     * Implemented by OutputBuffer/InputBuffer or other 
+     * objects that hold a reference to AsyncHttp.
+     */  
+    public static interface AsyncHttpHolder {
+        AsyncHttp getAsyncHttp();
+    }
+
+
+    /**
+     * Called when the incoming headers have been received.
+     * ( response for client mode, request for server mode )
+     */
+    public void headersReceived(AsyncHttp client, 
+                                Request req, Response res) {
+    }
+
+    /** 
+     * Called after the entire body is received.
+     * 
+     *  The request is not yet complete - we may still send. 
+     *  
+     */
+    public void bodyReceived(AsyncHttp client, 
+                             Request req, Response res, 
+                             boolean error, Throwable exception) throws IOException {
+
+    }
+
+    /**
+     * Called after all entity body is sent.
+     */
+    public void bodySent(AsyncHttp client, 
+                         boolean error, Throwable exception) {
+
+    }
+
+    /** 
+     * Called when:
+     *  - body sent
+     *  - body received
+     *  - release() called - either service() done or client done with the 
+     *  buffers. 
+     *  
+     *  After this callback:
+     *  - socket closed if closeOnEndSend, or put in keep-alive
+     *  - AsyncHttp.recycle()
+     *  - returned to the pool.
+     */
+    public void done(AsyncHttp client, 
+                     boolean error, Throwable exception) throws IOException {
+
+    }
+
+    /**
+     * Called when data is received - the buffer will contain 
+     * all the data received so far, not only the last chunk.
+     * 
+     * IMPORTANT: only Content-Length and chunked transfer encoding is processed
+     * by HttpClient, compression or non-standard Content-Encoding and 
+     * Transfer-Encoding are not handled ( they are hard to support in 
+     * non-blocking mode due to lack of libraries ). This method provides access
+     * to the 'raw' data - if the 'encoded' flag is set client can either 
+     * decode it itself, or use the blocking reader instead, which supports
+     * decoding.
+     * 
+     * @param buffer TODO
+     * @param encodings if not null, the list of additional encodings - client 
+     *   must handle them
+     * 
+     * @throws IOException 
+     */
+    public void dataReceived(AsyncHttp client, 
+                             Response res,
+                             ByteChunk data, 
+                             ByteChunk buffer, 
+                             List<String> encodings) throws IOException {
+    }
+
+
+    // TODO: canWrite() callback when all chunks written ?
+    // or maybe chuneWritten(null) ? 
+    
+    /** 
+     * Write is initiated with startWrite(bc).
+     * When the block has been sent, this method is called. 
+     * This can be called from selector thread or from startWrite (if enough 
+     * space in OS buffers ), should not block.
+     */
+    public void dataWritten(ByteChunk bc) {
+    }
+
+
+
+}
\ No newline at end of file

Propchange: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/AsyncHttpCallback.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/AsyncHttpPool.java
URL: http://svn.apache.org/viewvc/tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/AsyncHttpPool.java?rev=667814&view=auto
==============================================================================
--- tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/AsyncHttpPool.java (added)
+++ tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/AsyncHttpPool.java Sat Jun 14 08:28:06 2008
@@ -0,0 +1,87 @@
+/*
+ */
+package org.apache.coyote.client;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.logging.Logger;
+
+import org.apache.tomcat.util.net.SelectorPool;
+
+/**
+ * Factory and pool for AsyncHttp clients.
+ * 
+ * @author Costin Manolache
+ */
+public class AsyncHttpPool {
+
+    Queue<AsyncHttp> pool = new ConcurrentLinkedQueue<AsyncHttp>();
+    SocketPool socketPool;
+    String id; 
+    boolean debug = false;
+    protected Logger log = Logger.getLogger("ClientPool");
+
+    static AsyncHttpPool defaultPool = new AsyncHttpPool("default", null);
+    SelectorPool client;
+    
+    
+    
+    public AsyncHttpPool(String id, SelectorPool selectors) {
+        socketPool = new SocketPool(id);
+        if (selectors == null) {
+            client = SelectorPool.defaultPool();
+        } else {
+            client = selectors;
+        }
+    }
+    
+    public static AsyncHttpPool getDefault() {
+        return defaultPool;
+    }
+
+    /**
+     * Get an existing AsyncHttp object. Since it uses many buffers and 
+     * objects - it's more efficient to pool it. 
+     * 
+     * release will return the object to the pool.
+     */
+    public AsyncHttp get() {
+        AsyncHttp processor = pool.poll();
+        if (processor == null) {
+            if (client == null) {
+                client = SelectorPool.defaultPool();
+            }
+            processor = create();
+            if (debug) {
+                log.info("Create " + id + 
+                        " " + processor.ser);
+            }
+        } else {
+            if (debug) {
+                log.info("Get " + id + " ser=" +
+                    processor.ser + " size=" + pool.size());
+            }
+        }
+        processor.setClientPool(this);
+        return processor;
+    }
+
+    protected AsyncHttp create() {
+        return new AsyncHttp(client);
+    }
+    
+    public void add(AsyncHttp http) {
+        pool.add(http);
+        if (debug) {
+            log.info("Return " + id + " ser=" +
+                http.ser + " size=" + pool.size());
+        }
+    }
+    
+    
+    public SocketPool getSocketPool() {
+        return socketPool;
+    }
+
+    
+}

Propchange: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/AsyncHttpPool.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/BlockingHttp.java
URL: http://svn.apache.org/viewvc/tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/BlockingHttp.java?rev=667814&view=auto
==============================================================================
--- tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/BlockingHttp.java (added)
+++ tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/BlockingHttp.java Sat Jun 14 08:28:06 2008
@@ -0,0 +1,311 @@
+/*
+ */
+package org.apache.coyote.client;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.logging.Logger;
+
+import org.apache.coyote.Request;
+import org.apache.coyote.Response;
+import org.apache.coyote.client.AsyncHttp.State;
+import org.apache.tomcat.util.buf.ByteChunk;
+
+/**
+ * Blocking http interface on top of AsyncHttp.
+ * This is the default callback for AsyncHttp, but it can be 
+ * replaced.
+ * 
+ * 
+ * @author Costin Manolache
+ */
+public class BlockingHttp extends AsyncHttpCallback {
+
+    protected Logger log = Logger.getLogger("BlockHttp");
+    boolean debug = false;
+    
+    protected AsyncHttp client;
+    
+    // TODO: break it - each operation should have a different TO    
+    protected long timeout = 10000; // Long.MAX_VALUE;
+    
+    private ReentrantLock responseLock = new ReentrantLock();
+    private Condition responseCond = responseLock.newCondition();
+
+    private ReentrantLock resHeadLock = new ReentrantLock();
+    private Condition resHeadCond = resHeadLock.newCondition();
+
+    private ReentrantLock resDataLock = new ReentrantLock();
+    private Condition resDataCond = resDataLock.newCondition();
+
+    private ReentrantLock writeDataLock = new ReentrantLock();
+    private Condition writeDataCond = writeDataLock.newCondition();
+
+    /**
+     * Store the received data. 
+     */
+    protected List<ByteChunk> inBrigade = new LinkedList<ByteChunk>();
+    boolean done;
+    
+    BlockingHttp(AsyncHttp client) {
+        this.client = client;
+    }
+
+    public long getTimeout() {
+        return timeout;
+    }
+
+    public void setTimeout(long t) {
+        timeout = t;
+    }
+    
+    public void recycle() {
+        inBrigade.clear();
+        done = false;
+    }
+    
+    @Override
+    public void bodyReceived(AsyncHttp client, 
+                             Request req, Response res, 
+                             boolean error, Throwable exception) {
+    }
+    
+    @Override
+    public void done(AsyncHttp client, 
+                     boolean error, Throwable exception) {
+        
+        signal(resHeadLock, resHeadCond, "resHead");        
+        signal(resDataLock, resDataCond, "resData");
+        synchronized (this) {
+            done = true;
+        }
+        signal(responseLock, responseCond, "response");
+    }
+    
+    static int DEFAULT_CHUNK_SIZE = 4096;
+    
+    @Override
+    public void dataReceived(AsyncHttp client, 
+                             Response res,
+                             ByteChunk currentRecvBucket, 
+                             ByteChunk rawReadBuffer, 
+                             List<String> encodings) 
+            throws IOException {
+
+        if (rawReadBuffer.getBuffer().length - 
+                rawReadBuffer.getEnd() < 16) {
+            ByteChunk newBuffer = new ByteChunk(DEFAULT_CHUNK_SIZE);
+            client.setRawReadBuffer(newBuffer);
+        }
+        ByteChunk bb = new ByteChunk();
+        bb.setBytes(currentRecvBucket.getBytes(), 
+                currentRecvBucket.getOffset(), 
+                currentRecvBucket.length());
+        inBrigade.add(bb);
+        signal(resDataLock, resDataCond, "resData");
+    }
+    
+    public void headersReceived(AsyncHttp client, 
+                                Request req, Response res) {
+        signal(resHeadLock, resHeadCond, "resHead");
+    }
+    
+    /** 
+     * Write is initiated with startWrite(bc).
+     * When the block has been sent, this method is called. 
+     * This can be called from selector thread or from startWrite (if enough 
+     * space in OS buffers ), should not block.
+     */
+    public void dataWritten(ByteChunk bc) {
+        if (client.sendBrigade.size() == 0) {
+            // Done writting, no longer interested in write callback
+            signal(writeDataLock, writeDataCond, "writeData");
+        }
+    }
+    
+    private void signal(ReentrantLock lock, Condition cond, 
+                        String name) {
+        lock.lock();
+        try {
+            if (debug) {
+                log.info("Sent notification ser=" + client.ser + 
+                        " " + name +
+                        " done=" + done + 
+                        " rcv=" + client.recvState +
+                        " snd=" + client.sendState +
+                        " inSize=" + inBrigade.size() + 
+                        " outSize=" + client.sendBrigade.size() +
+                        " l=" + lock + 
+                        " s=" + cond
+                        );
+                        
+            }
+            cond.signalAll();
+        } finally {
+            lock.unlock();
+        }
+    }
+    
+    private long waitSignal(long to, 
+                            ReentrantLock lock, 
+                            Condition cond,
+                            String name) throws IOException {
+        if (to < 0) {
+            throw new IOException(new TimeoutException());
+        } else if (to == 0) {
+            to = timeout;
+        }
+        lock.lock();
+        try {
+            if (debug) {
+                log.info("Wait notification " + name +
+                        " done=" + done + 
+                        " rcv=" + client.recvState +
+                        " snd=" + client.sendState +
+                        " inSize=" + inBrigade.size() + 
+                        " outSize=" + client.sendBrigade.size() + 
+                        " ser=" + client.ser + 
+                        " l=" + lock + 
+                        " s=" + cond
+                        );
+            }
+            if (to == Long.MAX_VALUE) { 
+                cond.await();
+            } else {
+                to = cond.awaitNanos(1000000 * to);
+            }
+            if (debug) {
+                log.info("Notification recv " + name +
+                        " rcv=" + client.recvState +
+                        " snd=" + client.sendState +
+                        " inSize=" + inBrigade.size() + 
+                        " outSize=" + client.sendBrigade.size() + 
+                        " ser=" + client.ser);
+            }
+        } catch (InterruptedException ex) {
+            ex.printStackTrace();
+        } finally {
+            lock.unlock();
+        }
+        return to;
+    }
+
+    
+    public void waitResponse(long to) throws IOException {
+        while(true) {
+            synchronized (this) {
+                if (done) {
+                    return;
+                }
+            }
+            to = waitSignal(to, responseLock, responseCond, "response");
+        }
+    }
+
+
+    public void waitResponseHead(long to) throws IOException {
+        while(client.recvState == State.HEAD) {
+            to = waitSignal(to, resHeadLock, resHeadCond, "resHead");
+        }
+    }
+
+    /**
+     * Block until some data is received. 
+     * This only works if no callback is set - callbacks need
+     * to consume or move the data.
+     * @throws TimeoutException 
+     */
+    public void waitDataRead(long to) throws IOException {
+        while(inBrigade.size() == 0 && client.recvState != State.DONE) {
+            to = waitSignal(to, resDataLock, resDataCond, "resData");
+        }
+    }
+
+    /**
+     * Block until all data is sent - i.e. nothing left to 
+     * send
+     * @throws TimeoutException 
+     */
+    public void waitDataWrite(long to) throws IOException {
+        while(client.sendBrigade.size() > 0) {
+            to = waitSignal(to, writeDataLock, writeDataCond, "writeData");
+        }
+    }
+
+    // This is where data will be saved by selector read
+    ThreadLocal<ByteChunk> writeBuffer = new ThreadLocal<ByteChunk>() {
+        @Override
+        protected ByteChunk initialValue() {
+            return new ByteChunk();
+        }
+    };
+
+    /** 
+     * Blocking write. 
+     * @param chunk contains data, will not be modified
+     */
+    public int doWrite(ByteChunk chunk) 
+            throws IOException {
+        // This is blocking.
+        ByteChunk wb = writeBuffer.get();
+        wb.setBytes(chunk.getBuffer(), chunk.getOffset(), 
+                chunk.length());
+        boolean done = client.startWrite(wb);
+        if (!done) {
+            // block until data is received.
+            waitDataWrite(timeout);
+        }
+        // TODO: allow non-blocking write
+        return chunk.length();
+    }
+    
+    public int available() {
+        int avail = 0;
+        for (ByteChunk bc: inBrigade) {
+            avail += bc.length();
+        }
+        return avail;
+    }
+    
+    /** 
+     * Return the next received chunk. The ByteChunk is now 
+     * 'owned' by the caller. 
+     */
+    public ByteChunk readBlocking(long to) 
+            throws IOException {
+        // block until data is received.
+        if (inBrigade.size() == 0 && client.recvState == State.DONE) {
+            return null; // end
+        } else {
+            if (inBrigade.size() == 0) {
+                waitDataRead(timeout);
+                if (inBrigade.size() == 0) {
+                    if (client.recvState == State.DONE) {
+                        return null; 
+                    } else {
+                        throw new IOException("Timeout");
+                    }
+                }
+            }
+            ByteChunk currentBucket = inBrigade.get(0);
+            inBrigade.remove(0);
+            return currentBucket;
+        }
+    }
+    
+    public void readAll(ByteChunk chunk, long to) 
+            throws IOException {
+        waitResponse(to);
+        while (true) { 
+            ByteChunk next = readBlocking(to);
+            if (next == null) {
+                return;
+            }
+            chunk.append(next);
+        }
+    }
+}

Propchange: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/BlockingHttp.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/SocketPool.java
URL: http://svn.apache.org/viewvc/tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/SocketPool.java?rev=667814&view=auto
==============================================================================
--- tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/SocketPool.java (added)
+++ tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/SocketPool.java Sat Jun 14 08:28:06 2008
@@ -0,0 +1,168 @@
+/*
+ */
+package org.apache.coyote.client;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Logger;
+
+import org.apache.tomcat.util.modeler.Registry;
+import org.apache.tomcat.util.net.SelectorCallback;
+import org.apache.tomcat.util.net.SelectorThread.SelectorData;
+
+/** 
+ * Very simple socket pool. For each remote host holds a list 
+ * of open connections, with a close callback. 
+ * 
+ * TODO: add timeouts, limits per host/total, expire old entries 
+ * 
+ * @author Costin Manolache
+ */
+public class SocketPool {
+    static Logger log = Logger.getLogger(SocketPool.class.getName());
+    static boolean debug = false;
+    /** 
+     * Wait for close from the other end, remove the socket from the pool. 
+     */
+    static class CloseCallback extends SelectorCallback {
+        RemoteServer target;
+
+        @Override
+        public void dataReceived(SelectorData selThread) 
+        throws IOException {
+            ByteBuffer bb = ByteBuffer.allocate(1024);
+            int rd = selThread.sel.readNonBlocking(selThread, bb);
+            if (rd < 0) {
+                // closed
+                selThread.sel.close(selThread);
+                return;
+            } 
+            log.warning("Unexpected data waiting close " + selThread);
+            selThread.sel.close(selThread);
+            return;
+        }
+
+        @Override
+        public void channelClosed(SelectorData selThread, Throwable ex) {
+            if (debug) {
+                log.fine("Closed..." + selThread);
+            }
+            target.pool.closedSockets.incrementAndGet();
+            synchronized (target) {
+                target.connections.remove(selThread);
+            }
+        }
+    }
+
+
+    /** 
+     * Connections for one remote host.
+     * This should't be restricted by IP:port or even hostname,
+     * for example if a server has multiple IPs or LB replicas - any would work.   
+     */
+    static class RemoteServer {
+        SocketPool pool;
+        ArrayList<SelectorData> connections = new ArrayList<SelectorData>();
+    }
+
+
+    /**
+     * Map from client names to socket pools.
+     */
+    Map<CharSequence, RemoteServer> hosts = new HashMap<CharSequence, RemoteServer>();
+
+    // Statistics
+    AtomicInteger waitingSockets = new AtomicInteger();
+    AtomicInteger closedSockets = new AtomicInteger();
+
+    static Registry registry = Registry.getRegistry(null, null);
+
+    // TODO: keep track of all socket pools, enforce max total sockets.
+    // TODO: for linux, find how many sockets we can have open, use as limit
+    
+    public SocketPool(String id) {
+        try {
+            registry.registerComponent(this, ":name=SocketPool,id=" + id, "SocketPool");
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    public int getTargetCount() {
+        return hosts.size();
+    }
+
+    public int getSocketCount() {
+        return waitingSockets.get();
+    }
+
+    public int getClosedSockets() {
+        return closedSockets.get();
+    }
+
+    public String dumpSockets() {
+        StringBuffer sb = new StringBuffer();
+        for (CharSequence k: hosts.keySet()) {
+            RemoteServer t = hosts.get(k);
+            sb.append(k).append("=").append(t.connections.size()).append("\n");
+        }
+        return sb.toString();
+    }
+
+    /** 
+     * @param key host:port, or some other key if multiple hosts:ips
+     * are connected to equivalent servers ( LB ) 
+     * @throws IOException 
+     */
+    public SelectorData getChannel(CharSequence key, 
+                                   SelectorCallback cstate) throws IOException {
+        RemoteServer t = null;
+        synchronized (hosts) {
+            t = hosts.get(key);
+        }
+        if (t == null) {
+            return null;
+        }
+        if (t.connections.size() == 0) {
+            return null;
+        } // one may be added - no harm.
+        SelectorData res = null;
+        synchronized (t) {
+            res = t.connections.remove(t.connections.size() - 1);
+
+            waitingSockets.decrementAndGet();
+            if (res == null) {
+                return null;
+            }
+            res.sel.updateCallback(res, res.callback, cstate);
+        }
+        return res;      
+    }
+
+    public void returnChannel(CharSequence key, 
+                              SelectorData sdata,
+                              SelectorCallback cstate) 
+            throws IOException {
+        RemoteServer t = null;
+        synchronized (hosts) {
+            t = hosts.get(key);
+            if (t == null) {
+                t = new RemoteServer();
+                t.pool = this;
+                hosts.put(key, t);
+            }
+        }
+        CloseCallback cc = new CloseCallback();
+        cc.target = t;
+        waitingSockets.incrementAndGet();
+        synchronized (t) {
+            t.connections.add(sdata);      
+            sdata.sel.updateCallback(sdata, cstate, cc);
+            sdata.sel.readInterest(sdata, true);
+        }
+    }
+}

Propchange: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/SocketPool.java
------------------------------------------------------------------------------
    svn:eol-style = native



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org