You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2009/03/02 07:13:17 UTC

svn commit: r749207 [4/12] - in /incubator/cassandra/src/org/apache/cassandra: loader/ locator/ net/ net/http/ net/io/ net/sink/ procedures/ service/ test/ tools/

Added: incubator/cassandra/src/org/apache/cassandra/net/http/HTMLFormatter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/http/HTMLFormatter.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/http/HTMLFormatter.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/http/HTMLFormatter.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,347 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Helper function to write some basic HTML.
+ */
+
+package org.apache.cassandra.net.http;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+import java.util.List;
+import java.util.Set;
+
+/**
+ *
+ * @author kranganathan
+ */
+public class HTMLFormatter
+{
+    protected StringBuilder sb_ = null;
+    private boolean writeBody_;
+
+    public HTMLFormatter()
+    {
+        sb_ = new StringBuilder();
+    }
+
+    public HTMLFormatter(StringBuilder sb)
+    {
+        sb_ = sb;
+    }
+
+    public void startBody()
+    {
+    	startBody(false, "", true, true);
+    }
+
+    public void startBody(boolean writeJSCallback, String jsCallbackFunction, boolean writeCSS, boolean writeBody)
+    {
+    	writeBody_ = writeBody;
+
+        sb_.append("<html>\n");
+        if(writeCSS || writeJSCallback)
+        {
+	        sb_.append("<head>\n");
+	        if(writeJSCallback)
+	        	addJSCallback(jsCallbackFunction);
+	        if(writeCSS)
+	        	addCSS();
+	        sb_.append("</head>\n");
+        }
+
+        if(writeBody)
+        {
+        	sb_.append("<body bgcolor=black>\n");
+        }
+    }
+
+    public void endBody()
+    {
+    	if(writeBody_)
+    	{
+    		sb_.append("</body>\n");
+    	}
+        sb_.append("</html>\n");
+    }
+
+    public void appendLine(String s)
+    {
+        sb_.append(s);
+        sb_.append("<br>\n");
+    }
+
+    public void append(String s)
+    {
+        sb_.append(s);
+    }
+
+    public void addJScript(String jscript)
+    {
+    	append("<script language=\"text/javascript\">\n");
+    	append(jscript + "\n");
+    	append("</script>\n");
+    }
+
+    public void startTable()
+    {
+        sb_.append("<table>\n");
+    }
+
+    public void addHeaders(String[] sTableHeaders)
+    {
+        sb_.append("<tr style=\"border: 2px solid #333333\"	>\n");
+        for (int i = 0; i < sTableHeaders.length; ++i)
+        {
+            sb_.append("<th><div class=\"tmenubar\">");
+            sb_.append("<b>" + sTableHeaders[i] + "</b>");
+            sb_.append("</div></th>\n");
+        }
+        sb_.append("\n</tr>\n\n");
+    }
+
+    public void addHeader(String sTableHeader)
+    {
+        sb_.append("<tr style=\"border: 2px solid #333333\"	>\n");
+        sb_.append("<th><div class=\"tmenubar\">");
+        sb_.append("<b>" + sTableHeader + "</b>");
+        sb_.append("</div></th>\n");
+        sb_.append("\n</tr>\n\n");
+    }
+
+    public void startRow()
+    {
+        sb_.append("<tr style=\"border: 2px solid #333333\">\n");
+    }
+
+    public void addCol(String sData)
+    {
+        sb_.append("<td style=\"border: 2px solid #333333\">");
+        sb_.append(sData);
+        sb_.append("</td>");
+    }
+
+    public void endRow()
+    {
+        sb_.append("</tr>\n");
+    }
+
+    public void endTable()
+    {
+        sb_.append("</table>\n");
+    }
+
+    public void addCombobox(Set<String> comboBoxEntries, String htmlElementName)
+    {
+    	addCombobox(comboBoxEntries, htmlElementName, -1);
+    }
+
+    public void addCombobox(Set<String> comboBoxEntries, String htmlElementName, int defaultSelected)
+    {
+    	sb_.append("  <select name=" + htmlElementName + " size=1>\n");
+    	if(defaultSelected == -1)
+    	{
+    		sb_.append("    <option value=\"\" SELECTED>Select an option \n");
+    	}
+
+    	int i = 0;
+    	for(String colFamName : comboBoxEntries)
+    	{
+    		if(defaultSelected == i)
+    		{
+    			sb_.append("    <option value=\"" + colFamName + "\" SELECTED>" + colFamName + "\n");
+    		}
+    		else
+    		{
+    			sb_.append("    <option value=\"" + colFamName + "\">" + colFamName + "\n");
+    		}
+    	}
+    	sb_.append("  </select>\n");
+    }
+
+    public void addDivElement(String divId, String value)
+    {
+    	sb_.append("<div id = \"" + divId + "\">");
+    	if(value != null)
+    		sb_.append(value);
+    	sb_.append("</div>\n");
+    }
+
+    public void createTable(String[] sTableHeaders, String[][] sTable)
+    {
+        if (sTable == null || sTable.length == 0)
+            return;
+
+        sb_.append("<table style=\"border: 2px solid #333333\">\n");
+
+        sb_.append("<tr style=\"border: 2px solid #333333\">\n");
+        for (int i = 0; i < sTableHeaders.length; ++i)
+        {
+            sb_.append("<td style=\"border: 2px solid #333333\">");
+            sb_.append("<b>" + sTableHeaders[i] + "</b>");
+            sb_.append("</td>\n");
+        }
+        sb_.append("\n</tr>\n\n");
+
+        for (int i = 0; i < sTable.length; ++i)
+        {
+            sb_.append("<tr style=\"border: 2px solid #333333\">\n");
+            for (int j = 0; j < sTable[i].length; ++j)
+            {
+                sb_.append("<td style=\"border: 2px solid #333333\">");
+                sb_.append(sTable[i][j]);
+                sb_.append("</td>\n");
+            }
+            sb_.append("\n</tr>\n\n");
+        }
+        sb_.append("</table>\n");
+    }
+
+    public void addJSCallback(String jsCallbackFunction)
+    {
+    	sb_.append("<script type=\"text/javascript\">\n");
+
+    	addJSForTabs();
+
+    	sb_.append(jsCallbackFunction +"\n");
+    	sb_.append("</script>\n");
+    }
+
+    public void addCSS()
+    {
+        sb_.append("<style type=\"text/css\">\n");
+        sb_.append("body\n");
+        sb_.append("{\n");
+        sb_.append("  color:white;\n");
+        sb_.append("  font-family:Arial Unicode MS,Verdana, Arial, Sans-serif;\n");
+        sb_.append("  font-size:10pt;\n");
+        sb_.append("}\n");
+
+        sb_.append(".tmenubar\n");
+        sb_.append("{\n");
+        sb_.append("  background-color:green;\n");
+        sb_.append("  font-family:Verdana, Arial, Sans-serif;\n");
+        sb_.append("  font-size:10pt;\n");
+        sb_.append("  font-weight:bold;\n");
+        sb_.append("}\n");
+
+        sb_.append("th\n");
+        sb_.append("{\n");
+        sb_.append(" 	 color:white;\n");
+        sb_.append("}\n");
+
+        sb_.append("td\n");
+        sb_.append("{\n");
+        sb_.append(" 	 color:white;\n");
+        sb_.append("}\n");
+        sb_.append("a:link {color:#CAF99B;font-size:10pt;font-weight:bold;font-family:Arial Unicode MS,Lucida-grande,Verdana}\n");
+        sb_.append("a:visited {color:red}\n");
+        sb_.append("a:hover{color:yellow;font-size:10pt;font-weight:bold;font-family:Arial Unicode MS,Lucida-grande,Verdana;background-color:green}\n");
+
+        addCSSForTabs();
+
+        sb_.append("</style>\n");
+
+    }
+
+    public void addCSSForTabs()
+    {
+    	sb_.append("#header ul {\n");
+    	sb_.append("	list-style: none;\n");
+    	sb_.append("	padding: 0;\n");
+    	sb_.append("	margin: 0;\n");
+    	sb_.append("	}\n");
+    	sb_.append("\n");
+    	sb_.append("#header li {\n");
+    	sb_.append("	float: left;\n");
+    	sb_.append("	border: 1px solid #bbb;\n");
+    	sb_.append("	border-bottom-width: 0;\n");
+    	sb_.append("	margin: 0;\n");
+    	sb_.append("}\n");
+    	sb_.append("\n");
+    	sb_.append("#header a {\n");
+    	sb_.append("	text-decoration: none;\n");
+    	sb_.append("	display: block;\n");
+    	sb_.append("	background: #eee;\n");
+    	sb_.append("	padding: 0.24em 1em;\n");
+    	sb_.append("	color: #00c;\n");
+    	sb_.append("	width: 8em;\n");
+    	sb_.append("	text-align: center;\n");
+    	sb_.append("	}\n");
+    	sb_.append("\n");
+    	sb_.append("#header a:hover {\n");
+    	sb_.append("	background: #ddf;\n");
+    	sb_.append("}\n");
+    	sb_.append("\n");
+    	sb_.append("#header #selected {\n");
+    	sb_.append("	border-color: black;\n");
+    	sb_.append("}\n");
+    	sb_.append("\n");
+    	sb_.append("#header #selected a {\n");
+    	sb_.append("	position: relative;\n");
+    	sb_.append("	top: 1px;\n");
+    	sb_.append("	background: white;\n");
+    	sb_.append("	color: black;\n");
+    	sb_.append("	font-weight: bold;\n");
+    	sb_.append("}\n");
+    	sb_.append("\n");
+    	sb_.append("#content {\n");
+    	sb_.append("	border: 1px solid black;\n");
+    	sb_.append("	visibility:hidden;\n");
+    	sb_.append("	position:absolute;\n");
+    	sb_.append("	top:200;\n");
+    	sb_.append("	clear: both;\n");
+    	sb_.append("	padding: 0 1em;\n");
+    	sb_.append("}\n");
+    	sb_.append("\n");
+    	sb_.append("h1 {\n");
+    	sb_.append("	margin: 0;\n");
+    	sb_.append("	padding: 0 0 1em 0;\n");
+    	sb_.append("}\n");
+    }
+
+    public void addJSForTabs()
+    {
+    	sb_.append("var curSelectedDivId = \"one\";\n");
+    	sb_.append("\n");
+    	sb_.append("function selectTab(tabDivId)\n");
+    	sb_.append("{\n");
+    	sb_.append("	var x = document.getElementsByName(curSelectedDivId);\n");
+    	sb_.append("	if(x[1])\n");
+    	sb_.append("		x[1].style.visibility=\"hidden\";\n");
+    	sb_.append("	if(x[0])\n");
+    	sb_.append("		x[0].id=curSelectedDivId;\n");
+    	sb_.append("\n");
+    	sb_.append("\n");
+    	sb_.append("	var y = document.getElementsByName(tabDivId);\n");
+    	sb_.append("	if(y[1])\n");
+    	sb_.append("		y[1].style.visibility=\"visible\";\n");
+    	sb_.append("	if(y[0])\n");
+    	sb_.append("		y[0].id = \"selected\";\n");
+    	sb_.append("\n");
+    	sb_.append("	curSelectedDivId = tabDivId;\n");
+    	sb_.append("}\n");
+    }
+
+    public String toString()
+    {
+        return sb_.toString();
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/net/http/HttpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/http/HttpConnection.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/http/HttpConnection.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/http/HttpConnection.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,431 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * This class accepts a client connection and parses http data from it.
+ */
+
+// TODO: shouldClose_ is not used correctly. It should be used to close the socket? When?
+
+package org.apache.cassandra.net.http;
+
+import java.util.*;
+import java.net.*;
+import java.io.*;
+import java.nio.*;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import org.apache.cassandra.service.*;
+import org.apache.cassandra.concurrent.SingleThreadedStage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.SelectionKeyHandler;
+import org.apache.cassandra.net.SelectorManager;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ *
+ * @author kranganathan
+ */
+public class HttpConnection extends SelectionKeyHandler implements HttpStartLineParser.Callback, HttpHeaderParser.Callback
+{
+    private static Logger logger_ = Logger.getLogger(StorageService.class);
+    public static final String httpRequestVerbHandler_ = "HTTP-REQUEST-VERB-HANDLER";
+    public static final String httpStage_ = "HTTP-STAGE";
+
+    /*
+     * These are the callbacks into who ever intends
+     * to listen on the client socket.
+     */
+    public interface HttpConnectionListener
+    {
+        public void onRequest(HttpRequest httpRequest);
+        public void onResponse(HttpResponse httpResponse);
+    }
+
+    enum HttpMessageType
+    {
+        UNKNOWN,
+        REQUEST,
+        RESPONSE
+    }
+
+    enum ParseState
+    {
+        IN_NEW,
+        IN_START,
+        IN_HEADERS, IN_BODY
+    }
+
+    private ParseState parseState_ = ParseState.IN_NEW;
+    private long parseStartTime_ = 0;
+    private HttpMessageType currentMsgType_ = HttpMessageType.UNKNOWN;
+    private int contentLength_ = 0;
+    private List<ByteBuffer> bodyBuffers_ = new LinkedList<ByteBuffer>();
+    private boolean shouldClose_ = false;
+    private String defaultContentType_ = "text/html";
+    private HttpRequest currentRequest_ = null;
+    private HttpResponse currentResponse_ = null;
+    private HttpStartLineParser startLineParser_ = new HttpStartLineParser(this);
+    private HttpHeaderParser headerParser_ = new HttpHeaderParser(this);
+    /* Selection Key associated with this HTTP Connection */
+    private SelectionKey httpKey_;
+    /* SocketChannel associated with this HTTP Connection */
+    private SocketChannel httpChannel_;
+    /* HTTPReader instance associated with this HTTP Connection */
+    private HTTPReader httpReader_ = new HTTPReader();
+
+    /*
+     * This abstraction starts reading the data that comes in
+     * on a HTTP request. It accumulates the bytes read into
+     * a buffer and passes the buffer to the HTTP parser.
+    */
+
+    class HTTPReader implements Runnable
+    {
+        /* We read 256 bytes at a time from a HTTP connection */
+        private static final int bufferSize_ = 256;
+
+        /*
+         * Read buffers from the input stream into the byte buffer.
+         */
+        public void run()
+        {
+            ByteBuffer readBuffer = ByteBuffer.allocate(HTTPReader.bufferSize_);
+            try
+            {
+                int bytesRead = httpChannel_.read(readBuffer);
+                readBuffer.flip();
+                if ( readBuffer.remaining() > 0 )
+                    HttpConnection.this.parse(readBuffer);
+            }
+            catch ( IOException ex )
+            {
+                logger_.warn(LogUtil.throwableToString(ex));
+            }
+        }
+    }
+
+    public static class HttpRequestMessage
+    {
+        private HttpRequest httpRequest_;
+        private HttpConnection httpConnection_;
+
+        HttpRequestMessage(HttpRequest httpRequest, HttpConnection httpConnection)
+        {
+            httpRequest_ = httpRequest;
+            httpConnection_ = httpConnection;
+        }
+
+        public HttpRequest getHttpRequest()
+        {
+            return httpRequest_;
+        }
+
+        public HttpConnection getHttpConnection()
+        {
+            return httpConnection_;
+        }
+    }
+
+    /*
+     *  Read called on the Selector thread. This is called
+     *  when there is some HTTP request that needs to be
+     *  processed.
+    */
+    public void read(SelectionKey key)
+    {
+        if ( httpKey_ == null )
+        {
+            httpKey_ = key;
+            httpChannel_ = (SocketChannel)key.channel();
+        }
+        /* deregister interest for read */
+        key.interestOps( key.interestOps() & ( ~SelectionKey.OP_READ ) );
+        /* Add a task to process the HTTP request */
+        MessagingService.getReadExecutor().execute(httpReader_);
+    }
+
+    public void modifyKeyForRead(SelectionKey key)
+    {
+        key.interestOps( httpKey_.interestOps() | SelectionKey.OP_READ );
+    }
+
+    private void resetParserState()
+    {
+        startLineParser_.resetParserState();
+        headerParser_.resetParserState();
+        parseState_ = ParseState.IN_NEW;
+        contentLength_ = 0;
+        bodyBuffers_ = new LinkedList<ByteBuffer>();
+        currentMsgType_ = HttpMessageType.UNKNOWN;
+        currentRequest_ = null;
+        currentResponse_ = null;
+    }
+
+    public void close()
+    {        
+        logger_.info("Closing HTTP socket ...");
+        if ( httpKey_ != null )
+            SelectorManager.getSelectorManager().cancel(httpKey_);
+    }
+
+    /*
+     * Process the HTTP commands sent from the client. Reads
+     * the socket and parses the HTTP request.
+    */
+    public void parse(ByteBuffer bb)
+    {
+        try
+        {
+            logger_.debug("Processing http requests from socket ...");
+            switch (parseState_)
+            {
+                case IN_NEW:
+                    parseState_ = ParseState.IN_START;
+                    parseStartTime_ = System.currentTimeMillis();
+
+                // fall through
+                case IN_START:
+                    if (startLineParser_.onMoreBytesNew(bb) == false)
+                    {
+                        break;
+                    }
+                    else
+                    {
+                        /* Already done through the callback */
+                        parseState_ = ParseState.IN_HEADERS;
+                    }
+
+                // fall through
+                case IN_HEADERS:
+                    if (headerParser_.onMoreBytesNew(bb) == false)
+                    {
+
+                        break; // need more bytes
+                    }
+                    else
+                    {
+                        String len;
+                        if (currentMsgType_ == HttpMessageType.REQUEST)
+                        {
+                            len = currentRequest_.getHeader(HttpProtocolConstants.CONTENT_LENGTH);
+
+                            // find if we should close method
+                            if (currentRequest_.getVersion().equalsIgnoreCase("HTTP/1.1"))
+                            {
+                                /*
+                                 * Scan all of the headers for close messages
+                                 */
+                                String val = currentRequest_.getHeader(HttpProtocolConstants.CONNECTION);
+
+                                if (val != null && val.equalsIgnoreCase(HttpProtocolConstants.CLOSE))
+                                {
+                                    shouldClose_ = true;
+                                }
+                            } else if (currentRequest_.getVersion().equalsIgnoreCase("HTTP/1.0"))
+                            {
+                                /* By default no keep-alive */
+                                shouldClose_ = true;
+
+                                /*
+                                 * Scan all of the headers for keep-alive
+                                 * messages
+                                 */
+                                String val = currentRequest_.getHeader(HttpProtocolConstants.CONNECTION);
+
+                                if (val != null && val.equalsIgnoreCase(HttpProtocolConstants.KEEP_ALIVE))
+                                {
+                                    shouldClose_ = false;
+                                }
+                            } else
+                            {
+                                /* Assume 0.9 */
+                                shouldClose_ = true;
+                            }
+                        }
+                        else if (currentMsgType_ == HttpMessageType.RESPONSE)
+                        {
+                            len = currentResponse_.getHeader(HttpProtocolConstants.CONTENT_LENGTH);
+
+                        // TODO: pay attention to keep-alive and
+                        // close headers
+                        }
+                        else
+                        {
+                            logger_.warn("in HttpConnection::processInput_() Message type is not set");
+                            return;
+                        }
+
+                        if (len != null)
+                        {
+                            try
+                            {
+                                if(len == null || len.equals(""))
+                                    contentLength_ = 0;
+                                else
+                                    contentLength_ = Integer.parseInt(len);
+                            }
+                            catch (NumberFormatException ex)
+                            {
+                                throw new HttpParsingException();
+                            }
+                        }
+                        parseState_ = ParseState.IN_BODY;
+                    }
+
+                // fall through
+                case IN_BODY:
+                    boolean done = false;
+
+                    if (contentLength_ > 0)
+                    {
+                        if (bb.remaining() > contentLength_)
+                        {
+                            int newLimit = bb.position() + contentLength_;
+                            bodyBuffers_.add(((ByteBuffer) bb.duplicate().limit(newLimit)).slice());
+                            bb.position(newLimit);
+                            contentLength_ = 0;
+                        }
+                        else
+                        {
+                            contentLength_ -= bb.remaining();
+                            bodyBuffers_.add(bb.duplicate());
+                            bb.position(bb.limit());
+                        }
+                    }
+
+                if (contentLength_ == 0)
+                {
+                    done = true;
+                }
+
+                if (done)
+                {
+                    if (currentMsgType_ == HttpMessageType.REQUEST)
+                    {
+                        //currentRequest_.setParseTime(env_.getCurrentTime() - parseStartTime_);
+                        currentRequest_.setBody(bodyBuffers_);
+
+                        if (currentRequest_.getHeader("Content-Type") == null)
+                        {
+                            currentRequest_.addHeader("Content-Type", defaultContentType_);
+                        }
+
+                        handleRequest(currentRequest_);
+                    }
+                    else if (currentMsgType_ == HttpMessageType.RESPONSE)
+                    {
+                        logger_.info("Holy shit! We are not supposed to be here - ever !!!");
+                    }
+                    else
+                    {
+                        logger_.error("Http message type is still" +
+                                " unset after we finish parsing the body?");
+                    }
+
+                    resetParserState();
+                }
+            }
+
+        }
+        catch (final Throwable e)
+        {
+            logger_.warn(LogUtil.throwableToString(e));
+            //close();
+        }
+        finally
+        {
+            SelectorManager.getSelectorManager().modifyKeyForRead(httpKey_);
+        }
+    }
+
+    public void write(ByteBuffer buffer)
+    {
+        /*
+         * TODO: Make this a non blocking write.
+        */
+        try
+        {
+            while ( buffer.remaining() > 0 )
+            {
+                httpChannel_.write(buffer);
+            }
+            close();
+        }
+        catch ( IOException ex )
+        {
+            logger_.warn(LogUtil.throwableToString(ex));
+        }
+    }
+
+    private void handleRequest(HttpRequest request)
+    {
+        HttpConnection.HttpRequestMessage httpRequestMessage = new HttpConnection.HttpRequestMessage(request, this);
+        Message httpMessage = new Message(null, HttpConnection.httpStage_, HttpConnection.httpRequestVerbHandler_, new Object[]{httpRequestMessage});
+        MessagingService.receive(httpMessage);
+    }
+
+    // HttpStartLineParser.Callback interface implementation
+    public void onStartLine(String method, String path, String query, String version)
+    {
+        logger_.debug("Startline method=" + method + " path=" + path + " query=" + query + " version=" + version);
+
+        if (method.startsWith("HTTP"))
+        {
+                // response
+                currentMsgType_ = HttpMessageType.RESPONSE;
+                currentResponse_ = new HttpResponse();
+                currentResponse_.setStartLine(method, path, version);
+        }
+        else
+        {
+                // request
+                currentMsgType_ = HttpMessageType.REQUEST;
+                currentRequest_ = new HttpRequest();
+                currentRequest_.setStartLine(method, path, query, version);
+        }
+    }
+
+    // HttpHeaderParser.Callback interface implementation
+    public void onHeader(String name, String value)
+    {
+        if (currentMsgType_ == HttpMessageType.REQUEST)
+        {
+                currentRequest_.addHeader(name, value);
+        }
+        else if (currentMsgType_ == HttpMessageType.RESPONSE)
+        {
+                currentResponse_.addHeader(name, value);
+        }
+        else
+        {
+            logger_.warn("Unknown message type -- HttpConnection::onHeader()");
+        }
+
+        logger_.debug(name + " : " + value);
+    }
+}
+
+
+

Added: incubator/cassandra/src/org/apache/cassandra/net/http/HttpConnectionHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/http/HttpConnectionHandler.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/http/HttpConnectionHandler.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/http/HttpConnectionHandler.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.http;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.net.SelectionKeyHandler;
+import org.apache.cassandra.net.SelectorManager;
+import org.apache.log4j.Logger;
+
+public class HttpConnectionHandler extends SelectionKeyHandler
+{
+    private static Logger logger_ = Logger.getLogger(HttpConnectionHandler.class);
+    
+    public void accept(SelectionKey key)
+    {
+        try
+        {
+            ServerSocketChannel serverChannel = (ServerSocketChannel)key.channel();
+            SocketChannel client = serverChannel.accept();
+            if ( client != null )
+            {
+                client.configureBlocking(false);            
+                SelectionKeyHandler handler = new HttpConnection();
+                SelectorManager.getSelectorManager().register(client, handler, SelectionKey.OP_READ);
+            }
+        } 
+        catch(IOException e) 
+        {
+            logger_.warn(e);
+        }
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/net/http/HttpHeaderParser.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/http/HttpHeaderParser.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/http/HttpHeaderParser.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/http/HttpHeaderParser.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,378 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package org.apache.cassandra.net.http;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ *
+ * @author kranganathan
+ */
+/**
+ * A parser for HTTP header lines.  
+ * 
+ */
+public class HttpHeaderParser
+{
+
+    private Callback callback_;
+
+    public interface Callback
+    {
+
+        public void onHeader(String key, String value);
+    }
+
+    public HttpHeaderParser(Callback cb)
+    {
+        callback_ = cb;
+    }
+
+    enum HeaderParseState
+    {
+        // we are at the very beginning of the line
+        START_OF_HEADER_LINE,
+        // are at line beginning, read '\r' but ran out of bytes in this round
+        START_OF_HEADER_LINE_WITH_READ_SLASH_R,
+        // we are in the process of parsing a header key
+        IN_HEADER_KEY,
+        // eat whitespace after the ':' but before the value
+        PRE_HEADER_VALUE_WHITESPACE,
+        // we are in the process of parsing a header value
+        IN_HEADER_VALUE,
+        // were in IN_HEADER_VALUE and read '\r' but ran out of more bytes
+        IN_HEADER_VALUE_WITH_READ_SLASH_R,
+        /*
+         * got \r\n in the header value.  now consider whether its a multilined 
+         * value.  For example,
+         *
+         * HeaderKey: HeaderValue\r\n this is still part of the value\r\n
+         * 
+         * is a valid HTTP header line with value 
+         *
+         * HeaderValue\r\n this is still part of the value
+         *
+         * NOTE: while all whitespace should generally be condensed into a 
+         * single space by the HTTP standard, we will just preserve all of the
+         * whitespace for now
+         * 
+         * TODO: consider replacing all whitespace with a single space
+         * 
+         * TODO: this parser doesn't correctly preserve the \r\n, should it?
+         */
+        CHECKING_END_OF_VALUE,
+        // we are just about to reset the state of the header parser
+        TO_RESET
+    }
+
+    // the current state of the parser
+    private HeaderParseState parseState_ = HeaderParseState.TO_RESET;
+    // incrementally build up this HTTP header key as we read it
+    private StringBuilder headerKey_ = new StringBuilder(32);
+
+    // incrementally build up this HTTP header value as we read it
+    private StringBuilder headerValue_ = new StringBuilder(64);
+
+    public void resetParserState()
+    {
+        headerKey_.setLength(0);
+        headerValue_.setLength(0);
+        parseState_ = HeaderParseState.START_OF_HEADER_LINE;
+    }
+
+    private void finishCurrentHeader_()
+    {
+        if (callback_ != null)
+        {
+            callback_.onHeader(headerKey_.toString().trim(), headerValue_
+                    .toString().trim());
+        }
+        resetParserState();
+    }
+
+    public boolean onMoreBytes(InputStream in) throws IOException
+    {
+        int got;
+
+        if (parseState_ == HeaderParseState.TO_RESET)
+        {
+            resetParserState();
+        }
+
+        while (in.available() > 0)
+        {
+            in.mark(1);
+            got = in.read();
+
+            switch (parseState_)
+            {
+
+            case START_OF_HEADER_LINE:
+                switch (got)
+                {
+                case '\r':
+                    if (in.available() > 0)
+                    {
+                        in.mark(1);
+                        got = in.read();
+
+                        if (got == '\n')
+                        {
+                            parseState_ = HeaderParseState.TO_RESET;
+                            return true;
+                        } // TODO: determine whether this \r-eating is valid
+                        else
+                        {
+                            in.reset();
+                        }
+                    } // wait for more data to make this decision
+                    else
+                    {
+                        in.reset();
+                        return false;
+                    }
+                    break;
+
+                default:
+                    in.reset();
+                    parseState_ = HeaderParseState.IN_HEADER_KEY;
+                    break;
+                }
+                break;
+
+            case IN_HEADER_KEY:
+                switch (got)
+                {
+                case ':':
+                    parseState_ = HeaderParseState.PRE_HEADER_VALUE_WHITESPACE;
+                    break;
+                // TODO: find out: whether to eat whitespace before a : 
+                default:
+                    headerKey_.append((char) got);
+                    break;
+                }
+                break;
+
+            case PRE_HEADER_VALUE_WHITESPACE:
+                switch (got)
+                {
+                case ' ':
+                case '\t':
+                    break;
+                default:
+                    in.reset();
+                    parseState_ = HeaderParseState.IN_HEADER_VALUE;
+                    break;
+                }
+                break;
+
+            case IN_HEADER_VALUE:
+                switch (got)
+                {
+                case '\r':
+                    if (in.available() > 0)
+                    {
+                        in.mark(1);
+                        got = in.read();
+
+                        if (got == '\n')
+                        {
+                            parseState_ = HeaderParseState.CHECKING_END_OF_VALUE;
+                            break;
+                        } // TODO: determine whether this \r-eating is valid
+                        else
+                        {
+                            in.reset();
+                        }
+                    }
+                    else
+                    {
+                        in.reset();
+                        return false;
+                    }
+                    break;
+                default:
+                    headerValue_.append((char) got);
+                    break;
+                }
+                break;
+
+            case CHECKING_END_OF_VALUE:
+                switch (got)
+                {
+                case ' ':
+                case '\t':
+                    in.reset();
+                    parseState_ = HeaderParseState.IN_HEADER_VALUE;
+                    break;
+                default:
+                    in.reset();
+                    finishCurrentHeader_();
+                }
+                break;
+            default:
+                assert false;
+                parseState_ = HeaderParseState.START_OF_HEADER_LINE;
+                break;
+            }
+        }
+
+        return false;
+    }
+
+    public boolean onMoreBytesNew(ByteBuffer buffer) throws IOException
+    {
+
+        int got;
+        int limit = buffer.limit();
+        int pos = buffer.position();
+
+        if (parseState_ == HeaderParseState.TO_RESET)
+        {
+            resetParserState();
+        }
+
+        while (pos < limit)
+        {
+            switch (parseState_)
+            {
+
+            case START_OF_HEADER_LINE:
+                if ((got = buffer.get(pos)) != '\r')
+                {
+                    parseState_ = HeaderParseState.IN_HEADER_KEY;
+                    break;
+                }
+                else
+                {
+                    pos++;
+                    if (pos == limit) // Need more bytes
+                    {
+                        buffer.position(pos);
+                        parseState_ = HeaderParseState.START_OF_HEADER_LINE_WITH_READ_SLASH_R;
+                        return false;
+                    }
+                }
+            // fall through
+
+            case START_OF_HEADER_LINE_WITH_READ_SLASH_R:
+                // Processed "...\r\n\r\n" - headers are complete
+                if (((char) buffer.get(pos)) == '\n')
+                {
+                    buffer.position(++pos);
+                    parseState_ = HeaderParseState.TO_RESET;
+                    return true;
+                } // TODO: determine whether this \r-eating is valid
+                else
+                {
+                    parseState_ = HeaderParseState.IN_HEADER_KEY;
+                }
+            //fall through
+
+            case IN_HEADER_KEY:
+                // TODO: find out: whether to eat whitespace before a :
+                while (pos < limit && (got = buffer.get(pos)) != ':')
+                {
+                    headerKey_.append((char) got);
+                    pos++;
+                }
+                if (pos < limit)
+                {
+                    pos++; //eating ':'
+                    parseState_ = HeaderParseState.PRE_HEADER_VALUE_WHITESPACE;
+                }
+                break;
+
+            case PRE_HEADER_VALUE_WHITESPACE:
+                while ((((got = buffer.get(pos)) == ' ') || (got == '\t'))
+                        && (++pos < limit))
+                {
+                    ;
+                }
+                if (pos < limit)
+                {
+                    parseState_ = HeaderParseState.IN_HEADER_VALUE;
+                }
+                break;
+
+            case IN_HEADER_VALUE:
+                while (pos < limit && (got = buffer.get(pos)) != '\r')
+                {
+                    headerValue_.append((char) got);
+                    pos++;
+                }
+                if (pos == limit)
+                {
+                    break;
+                }
+
+                pos++;
+                if (pos == limit)
+                {
+                    parseState_ = HeaderParseState.IN_HEADER_VALUE_WITH_READ_SLASH_R;
+                    break;
+                    //buffer.position(pos);
+                    //return false;
+                }
+            // fall through
+
+            case IN_HEADER_VALUE_WITH_READ_SLASH_R:
+                if (((char) buffer.get(pos)) == '\n')
+                {
+                    parseState_ = HeaderParseState.CHECKING_END_OF_VALUE;
+                    pos++;
+                } // TODO: determine whether this \r-eating is valid
+                else
+                {
+                    parseState_ = HeaderParseState.IN_HEADER_VALUE;
+                }
+                break;
+
+            case CHECKING_END_OF_VALUE:
+                switch ((char) buffer.get(pos))
+                {
+                case ' ':
+                case '\t':
+                    parseState_ = HeaderParseState.IN_HEADER_VALUE;
+                    break;
+
+                default:
+                    // Processed "headerKey headerValue\r\n"
+                    finishCurrentHeader_();
+                }
+                break;
+
+            default:
+                assert false;
+                parseState_ = HeaderParseState.START_OF_HEADER_LINE;
+                break;
+            }
+
+        }
+        // Need to read more bytes - get next buffer
+        buffer.position(pos);
+        return false;
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/net/http/HttpParsingException.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/http/HttpParsingException.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/http/HttpParsingException.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/http/HttpParsingException.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package org.apache.cassandra.net.http;
+
+import java.io.IOException;
+
+/**
+ *
+ * @author kranganathan
+ */
+
+public class HttpParsingException extends IOException
+{
+    private static final long serialVersionUID = 1L;
+}
+
+

Added: incubator/cassandra/src/org/apache/cassandra/net/http/HttpProtocolConstants.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/http/HttpProtocolConstants.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/http/HttpProtocolConstants.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/http/HttpProtocolConstants.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package org.apache.cassandra.net.http;
+
+/**
+ *
+ * @author kranganathan
+ */
+public interface HttpProtocolConstants
+{
+    static final String CONNECTION = "Connection";
+    static final String CONTENT_LENGTH = "Content-Length";
+    static final String CLOSE = "close";
+    static final String KEEP_ALIVE = "Keep-Alive";
+}

Added: incubator/cassandra/src/org/apache/cassandra/net/http/HttpRequest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/http/HttpRequest.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/http/HttpRequest.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/http/HttpRequest.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Encapsulates a HTTP request.
+ */
+
+package org.apache.cassandra.net.http;
+
+import java.util.*;
+import java.io.*;
+import java.net.URLDecoder;
+import java.nio.*;
+
+/**
+ *
+ * @author kranganathan
+ */
+public class HttpRequest
+{
+    private Map<String, String> headersMap_ = new HashMap<String, String>();
+    private Map<String, String> paramsMap_ = new HashMap<String, String>();
+    private String sBody_ = "";
+    private String method_;
+    private String path_;
+    private String query_;
+    private String version_;
+
+    /*
+     * Returns the type of method - GET, POST, etc.
+    */
+    public String getMethod()
+    {
+        return method_;
+    }
+
+    /*
+     * Gets the request path referenced by the request.
+     * For example, if the URL is of the form:
+     *  http://somedomain:PORT/some/path?param=value
+     * this function will return
+     *  "/some/path"
+     */
+    public String getPath()
+    {
+        return path_;
+    }
+
+    /*
+     * Gets the query in the request.
+     * For example, if the URL is of the form:
+     *  http://somedomain:PORT/some/path?param=value
+     * this function will return
+     *  "/some/path"
+     */
+    public String getQuery()
+    {
+        return query_;
+    }
+
+    /*
+     * Returns the supported HTTP protocol version.
+     */
+    public String getVersion()
+    {
+        return "HTTP/1.1";
+    }
+
+    /*
+     * This function add to the map of header name-values
+     * in the HTTP request.
+     */
+    public void addHeader(String name, String value)
+    {
+        headersMap_.put(name, value);
+    }
+
+    /*
+     * For a gives name, returns the value if it was in the
+     * headers. Returns the empty string otherwise.
+     */
+    public String getHeader(String name)
+    {
+        if(headersMap_.get(name) == null)
+            return "";
+        return headersMap_.get(name).toString();
+    }
+
+    public void setParameter(String name, String value)
+    {
+    	// first decode the data then store it in the map using the standard UTF-8 encoding
+    	String decodedValue = value;
+    	try
+    	{
+    		decodedValue = URLDecoder.decode(value, "UTF-8");
+    	}
+    	catch (Exception e)
+    	{
+    		// do nothing
+    	}
+        paramsMap_.put(name, decodedValue);
+    }
+
+    /*
+     * This function get the parameters from the body of the HTTP message.
+     * Returns the value for the parameter if one exists or returns null.
+     *
+z    * For example, if the body is of the form:
+     *  a=b&c=d
+     * this function will:
+     * return "b" when called as getParameter("a")
+     */
+    public String getParameter(String name)
+    {
+        if(paramsMap_.get(name) != null)
+            return paramsMap_.get(name).toString();
+        return null;
+    }
+
+    /*
+     * Get the string representation of the byte buffers passed in and put
+     * them in sBody_ variable. Then parse all the parameters in the body.
+     */
+    public void setBody(List<ByteBuffer> bodyBuffers)
+    {
+        if(bodyBuffers == null)
+            return;
+        try
+        {
+            // get the byte buffers that the body should be composed of
+            // and collect them in the string builder
+            StringBuilder sb = new StringBuilder();
+            for(int i = 0; i < bodyBuffers.size(); ++i)
+            {
+                ByteBuffer bb = bodyBuffers.get(i);
+                if(bb.remaining() <= 0)
+                {
+                    continue;
+                }
+                byte[] byteStr = new byte[bb.remaining()];
+                bb.get(byteStr);
+                String s = new String(byteStr);
+                sb.append(s);
+            }
+
+            // add the string to the body
+            if(sb.toString() != null)
+            {
+                sBody_ += sb.toString();
+            }
+
+            // once we are done with the body, parse the parameters
+            String[] sParamValues = sBody_.split("&");
+            for(int i = 0; i < sParamValues.length; ++i)
+            {
+                String[] paramVal = sParamValues[i].split("=");
+                if ( paramVal[0] != null && paramVal[1] != null )
+                {
+                    setParameter(paramVal[0], paramVal[1]);
+                }
+            }
+        }
+        catch(Exception e)
+        {
+        }
+    }
+
+    public String getBody()
+    {
+        return sBody_;
+    }
+
+    public void setStartLine(String method, String path, String query, String version)
+    {
+        method_ = method;
+        path_ = path;
+        query_ = query;
+        version_ = version;
+    }
+
+    public String toString()
+    {
+        StringWriter sw = new StringWriter();
+        PrintWriter pw = new PrintWriter(sw);
+
+        pw.println("HttpRequest-------->");
+        pw.println("method = " + method_ + ", path = " + path_ + ", query = " + query_ + ", version = " + version_);
+        pw.println("Headers: " + headersMap_.toString());
+        pw.println("Body: " + sBody_);
+        pw.println("<--------HttpRequest");
+
+        return sw.toString();
+    }
+}
+

Added: incubator/cassandra/src/org/apache/cassandra/net/http/HttpResponse.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/http/HttpResponse.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/http/HttpResponse.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/http/HttpResponse.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package org.apache.cassandra.net.http;
+
+import java.util.*;
+import java.io.*;
+import java.nio.*;
+
+/**
+ *
+ * @author kranganathan
+ */
+public class HttpResponse
+{
+    private Map<String, String> headersMap_ = new HashMap<String, String>();;
+    private String sBody_ = null;
+    private String method_ = null;
+    private String path_ = null;
+    private String version_ = null;
+
+
+    public String getMethod()
+    {
+        return method_;
+    }
+    
+    public String getPath()
+    {
+        return path_;
+    }
+    
+    public String getVersion()
+    {
+        return "HTTP/1.1";
+    }
+
+    public void addHeader(String name, String value)
+    {
+        headersMap_.put(name, value);
+    }
+
+    public String getHeader(String name)
+    {
+        return headersMap_.get(name).toString();
+    }
+
+    public void setBody(List<ByteBuffer> bodyBuffers)
+    {
+        StringBuffer sb = new StringBuffer();
+        while(bodyBuffers.size() > 0)
+        {
+            sb.append(bodyBuffers.remove(0).asCharBuffer().toString());
+        }
+        sBody_ = sb.toString();
+    }
+    
+    public String getBody()
+    {
+        return sBody_;
+    }
+    
+    public void setStartLine(String method, String path, String version)
+    {
+        method_ = method;
+        path_ = path;
+        version_ = version;
+    }    
+
+    public String toString()
+    {
+        StringWriter sw = new StringWriter();
+        PrintWriter pw = new PrintWriter(sw);
+        
+        pw.println("HttpResponse-------->");
+        pw.println("method = " + method_ + ", path = " + path_ + ", version = " + version_);
+        pw.println("Headers: " + headersMap_.toString());
+        pw.println("Body: " + sBody_);
+        pw.println("<--------HttpResponse");
+        
+        return sw.toString();
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/net/http/HttpStartLineParser.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/http/HttpStartLineParser.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/http/HttpStartLineParser.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/http/HttpStartLineParser.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,389 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package org.apache.cassandra.net.http;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ *
+ * @author kranganathan
+ */
+public class HttpStartLineParser
+{
+    private Callback callback_;
+
+    public interface Callback
+    {
+        void onStartLine(String method, String path, String query, String version);
+    };
+    
+    public HttpStartLineParser(Callback cb)
+    {
+        callback_ = cb;
+    }
+    
+    private enum StartLineParseState
+    {
+        EATING_WHITESPACE,
+        READING_METHOD,
+        READING_PATH,
+        READING_QUERY,
+        DECODING_FIRST_CHAR,
+        DECODING_SECOND_CHAR,
+        READING_VERSION,
+        CHECKING_EOL,
+        TO_RESET
+    }
+    
+    private StartLineParseState parseState_ = StartLineParseState.TO_RESET;
+    private StartLineParseState nextState_;
+    private StringBuilder httpMethod_ = new StringBuilder(32);
+    private StringBuilder httpPath_ = new StringBuilder();
+    private StringBuilder httpQuery_ = new StringBuilder(32);
+    private StringBuilder httpVersion_ = new StringBuilder();
+    
+    // we will encode things of the form %{2 digit hex number} and this is a 
+    // temporary holder for the leftmost digit's value as the second digit is
+    // being read
+    private int encodedValue_;
+    // this is a pointer to one of httpMethod_, httpPath_, httpQuery_, 
+    // httpVersion_ so that the encoded value can be appended to the correct 
+    // buffer
+    private StringBuilder encodeTo_;
+
+    public void resetParserState() {
+        httpMethod_.setLength(0);
+        httpPath_.setLength(0);
+        httpQuery_.setLength(0);
+        httpVersion_.setLength(0);
+        
+        parseState_ = StartLineParseState.EATING_WHITESPACE;
+        nextState_ = StartLineParseState.READING_METHOD;
+    }
+    
+    private void finishLine_()
+    {
+        if (callback_ != null) 
+        {
+            callback_.onStartLine(
+                    httpMethod_.toString(), 
+                    httpPath_.toString(), 
+                    httpQuery_.toString(), 
+                    httpVersion_.toString()
+                    );
+        }
+    }
+    
+    private static int decodeHex(int hex)
+    {
+        if (hex >= '0' && hex <= '9')
+        {
+            return hex-'0';
+        }
+        else if (hex >= 'a' && hex <= 'f')
+        {
+            return hex-'a'+10;
+        }
+        else if (hex >= 'A' && hex <= 'F')
+        {
+            return hex-'A'+10;
+        }
+        else
+        {
+            return 0;
+        }
+    }
+    
+    public boolean onMoreBytes(InputStream in) throws HttpParsingException, IOException
+    {
+        int got;
+
+        if (parseState_ == StartLineParseState.TO_RESET)
+        {
+            resetParserState();
+        }
+
+        while (in.available() > 0)
+        {
+            in.mark(1);
+            got = in.read();
+			
+            switch (parseState_)
+            {
+                case EATING_WHITESPACE:
+                        switch (got)
+                        {
+                            case ' ':
+                                        break;
+                            default:
+                                        in.reset();
+                                        parseState_ = nextState_;
+                                        break;
+                        }
+                        break;
+                    
+                case READING_METHOD:
+                        switch (got)
+                        {
+                            case ' ':
+                                    parseState_ = StartLineParseState.EATING_WHITESPACE;
+                                    nextState_ = StartLineParseState.READING_PATH;
+                                    break;
+                            default:
+                                    httpMethod_.append((char) got);
+                                    break;
+                        }
+                        break;
+                        
+                case READING_PATH:
+                        switch (got)
+                        {
+                            case '\r':
+                                    parseState_ = StartLineParseState.CHECKING_EOL;
+                                    break;
+                            case '%':
+                                    encodeTo_ = httpPath_;
+                                    nextState_ = parseState_;
+                                    parseState_ = StartLineParseState.DECODING_FIRST_CHAR;
+                                    break;
+                            case ' ':
+                                    parseState_ = StartLineParseState.EATING_WHITESPACE;
+                                    nextState_ = StartLineParseState.READING_VERSION;
+                                    break;
+                            case '?':
+                                    parseState_ = StartLineParseState.READING_QUERY;
+                                    break;
+                            default:
+                                    httpPath_.append((char) got);
+                                    break;
+                        }
+                        break;
+                            
+                case READING_QUERY:
+                        switch (got)
+                        {
+                            case '\r':
+                                    parseState_ = StartLineParseState.CHECKING_EOL;
+                                    break;
+                            case '%':
+                                    encodeTo_ = httpQuery_;
+                                    nextState_ = parseState_;
+                                    parseState_ = StartLineParseState.DECODING_FIRST_CHAR;
+                                    break;
+                            case ' ':
+                                    parseState_ = StartLineParseState.EATING_WHITESPACE;
+                                    nextState_ = StartLineParseState.READING_VERSION;
+                                    break;
+                            case '+':
+                                    httpQuery_.append(' ');
+                                    break;
+                            default:
+                                    httpQuery_.append((char) got);
+                                    break;
+                        }
+                        break;
+                            
+                case DECODING_FIRST_CHAR:
+                        encodedValue_ = decodeHex(got) * 16;
+                        parseState_ = StartLineParseState.DECODING_SECOND_CHAR;
+                        break;
+
+                case DECODING_SECOND_CHAR:
+                        encodeTo_.append((char) (decodeHex(got) + encodedValue_));
+                        parseState_ = nextState_;
+                        break;
+                                
+                case READING_VERSION:
+                        switch (got)
+                        {
+                            case '\r':
+                                    parseState_ = StartLineParseState.CHECKING_EOL;
+                                    break;
+                            default:
+                                    httpVersion_.append((char) got);
+                                    break;
+                        }
+                        break;
+                        
+                case CHECKING_EOL:
+                        switch (got)
+                        {
+                            case '\n':
+                                    finishLine_();
+                                    parseState_ = StartLineParseState.TO_RESET;
+                                    return true;
+                            default:
+                                    throw new HttpParsingException();
+                        }
+                        
+                default:
+                        throw new HttpParsingException();
+            }
+        }
+        
+        return false;
+    }
+    
+    public boolean onMoreBytesNew(ByteBuffer buffer) throws HttpParsingException, IOException
+    {
+        int got;
+        int limit = buffer.limit();
+        int pos = buffer.position();
+    	
+        if (parseState_ == StartLineParseState.TO_RESET)
+        {
+            resetParserState();
+        }
+
+        while(pos < limit) 
+        {
+            switch(parseState_)
+            {	
+                case EATING_WHITESPACE:
+                        while((char)buffer.get(pos) == ' ' && ++pos < limit);
+                        if(pos < limit)
+                            parseState_ = nextState_;
+                        break;
+
+                case READING_METHOD:
+                        while(pos < limit && (got = buffer.get(pos)) != ' ') 
+                        {
+                            httpMethod_.append((char)got);
+                            pos++;
+                        }
+
+                        if(pos < limit)
+                        {
+                            parseState_ = StartLineParseState.EATING_WHITESPACE;
+                            nextState_ = StartLineParseState.READING_PATH;
+                        }
+                        break;
+
+                case READING_PATH:
+                        while(pos < limit && parseState_ == StartLineParseState.READING_PATH) 
+                        {
+                            got = buffer.get(pos++);
+
+                            switch (got)
+                            {
+                                case '\r':
+                                        parseState_ = StartLineParseState.CHECKING_EOL;
+                                        break;
+                                case '%':
+                                        encodeTo_ = httpPath_;
+                                        nextState_ = parseState_;
+                                        parseState_ = StartLineParseState.DECODING_FIRST_CHAR;
+                                        break;
+                                case ' ':
+                                        parseState_ = StartLineParseState.EATING_WHITESPACE;
+                                        nextState_ = StartLineParseState.READING_VERSION;
+                                        break;
+                                case '?':
+                                        parseState_ = StartLineParseState.READING_QUERY;
+                                        break;
+                                default:
+                                        httpPath_.append((char) got);
+                                        break;
+                            }
+                        }
+                        break;
+
+                case READING_QUERY:
+                        while(pos < limit && parseState_ == StartLineParseState.READING_QUERY) 
+                        {
+                            got = buffer.get(pos++);
+
+                            switch (got)
+                            {
+                                case '\r':
+                                        parseState_ = StartLineParseState.CHECKING_EOL;
+                                        break;
+                                case '%':
+                                        encodeTo_ = httpQuery_;
+                                        nextState_ = parseState_;
+                                        parseState_ = StartLineParseState.DECODING_FIRST_CHAR;
+                                        break;
+                                case ' ':
+                                        parseState_ = StartLineParseState.EATING_WHITESPACE;
+                                        nextState_ = StartLineParseState.READING_VERSION;
+                                        break;
+                                case '+':
+                                        httpQuery_.append(' ');
+                                        break;
+                                default:
+                                        httpQuery_.append((char) got);
+                                        break;
+                            }
+                        }
+                        break;
+
+                case DECODING_FIRST_CHAR:
+                        got = (int)buffer.get(pos++);
+                        encodedValue_ = decodeHex(got) * 16;
+                        parseState_ = StartLineParseState.DECODING_SECOND_CHAR;
+                        break;
+
+                case DECODING_SECOND_CHAR:
+                        got = (int)buffer.get(pos++);
+                        encodeTo_.append((char) (decodeHex(got) + encodedValue_));
+                        parseState_ = nextState_;
+                        break;
+
+                case READING_VERSION:
+                        while(pos < limit && (got = buffer.get(pos)) != '\r' )
+                        {
+                            httpVersion_.append((char)got);
+                            pos++;
+                        }
+                        if(pos < limit) 
+                        {
+                            parseState_ = StartLineParseState.CHECKING_EOL;
+                            pos++; // skipping '\r'
+                        }
+                        break;
+
+                case CHECKING_EOL:
+                        switch (buffer.get(pos++))
+                        {
+                            case '\n':
+                                    finishLine_();
+                                    parseState_ = StartLineParseState.TO_RESET;
+                                    buffer.position(pos);
+                                    return true;  //could have reached limit here
+                            default:
+                                    throw new HttpParsingException();
+                        }
+                        
+                default:
+                        throw new HttpParsingException();
+            }
+        } 
+
+        buffer.position(pos);
+        return false;
+    }
+}
+

Added: incubator/cassandra/src/org/apache/cassandra/net/http/HttpWriteResponse.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/http/HttpWriteResponse.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/http/HttpWriteResponse.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/http/HttpWriteResponse.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * This class writes the HTTP 1.1 responses back to the client.
+ */
+
+package org.apache.cassandra.net.http;
+
+import java.net.*;
+import java.nio.ByteBuffer;
+import java.io.*;
+
+/**
+ *
+ * @author kranganathan
+ */
+public class HttpWriteResponse
+{
+    private HttpRequest httpRequest_ = null;
+    private StringBuilder body_ = new StringBuilder();
+
+    public HttpWriteResponse(HttpRequest httpRequest)
+    {
+        httpRequest_ = httpRequest;
+    }
+
+    public void println(String responseLine)
+    {
+        if(responseLine != null)
+        {
+            body_.append(responseLine);
+            body_.append( System.getProperty("line.separator"));
+        }
+    }
+
+    public ByteBuffer flush() throws Exception
+    {
+        StringBuilder sb = new StringBuilder();
+        // write out the HTTP response headers first
+        sb.append(httpRequest_.getVersion() + " 200 OK\r\n");
+        sb.append("Content-Type: text/html\r\n");
+        if(body_.length() > 0)
+        	sb.append("Content-Length: " + body_.length() + "\r\n");
+        sb.append("Cache-Control: no-cache\r\n");
+        sb.append("Pragma: no-cache\r\n");
+
+        // terminate the headers
+        sb.append("\r\n");
+
+        // now write out the HTTP response body
+        if(body_.length() > 0)
+            sb.append(body_.toString());
+
+        // terminate the body
+        //sb.append("\r\n");
+        //sb.append("\r\n");
+        ByteBuffer buffer = ByteBuffer.wrap(sb.toString().getBytes());
+        return buffer;
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/net/io/ContentLengthState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/io/ContentLengthState.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/io/ContentLengthState.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/io/ContentLengthState.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.io.IOException;
+
+import org.apache.cassandra.utils.FBUtilities;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class ContentLengthState extends StartState
+{
+    private ByteBuffer buffer_;
+
+    ContentLengthState(TcpReader stream)
+    {
+        super(stream);
+        buffer_ = ByteBuffer.allocate(4);
+    }
+
+    public byte[] read() throws IOException, ReadNotCompleteException
+    {        
+        return doRead(buffer_);
+    }
+
+    public void morphState() throws IOException
+    {
+        int size = FBUtilities.byteArrayToInt(buffer_.array());        
+        StartState nextState = stream_.getSocketState(TcpReader.TcpReaderState.CONTENT);
+        if ( nextState == null )
+        {
+            nextState = new ContentState(stream_, size);
+            stream_.putSocketState( TcpReader.TcpReaderState.CONTENT, nextState );
+        }
+        else
+        {               
+            nextState.setContextData(size);
+        }
+        stream_.morphState( nextState );
+        buffer_.clear();
+    }
+    
+    public void setContextData(Object data)
+    {
+        throw new UnsupportedOperationException("This method is not supported in the ContentLengthState");
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/net/io/ContentState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/io/ContentState.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/io/ContentState.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/io/ContentState.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.io.IOException;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class ContentState extends StartState
+{
+    private ByteBuffer buffer_;   
+    private int length_;
+
+    ContentState(TcpReader stream, int length)
+    {
+        super(stream);
+        length_ = length; 
+        buffer_ = ByteBuffer.allocate(length_);
+    }
+
+    public byte[] read() throws IOException, ReadNotCompleteException
+    {          
+        return doRead(buffer_);
+    }
+
+    public void morphState() throws IOException
+    {        
+        StartState nextState = stream_.getSocketState(TcpReader.TcpReaderState.DONE);
+        if ( nextState == null )
+        {
+            nextState = new DoneState(stream_, toBytes());
+            stream_.putSocketState( TcpReader.TcpReaderState.DONE, nextState );
+        }
+        else
+        {            
+            nextState.setContextData(toBytes());
+        }
+        stream_.morphState( nextState );               
+    }
+    
+    private byte[] toBytes()
+    {
+        buffer_.position(0); 
+        /*
+        ByteBuffer slice = buffer_.slice();        
+        return slice.array();
+        */  
+        byte[] bytes = new byte[length_];
+        buffer_.get(bytes, 0, length_);
+        return bytes;
+    }
+    
+    public void setContextData(Object data)
+    {
+        Integer value = (Integer)data;
+        length_ = value;               
+        buffer_.clear();
+        if ( buffer_.capacity() < length_ )
+            buffer_ = ByteBuffer.allocate(length_);
+        else
+        {            
+            buffer_.limit(length_);
+        }        
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/net/io/ContentStreamState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/io/ContentStreamState.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/io/ContentStreamState.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/io/ContentStreamState.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.SocketChannel;
+import java.io.*;
+
+import org.apache.cassandra.db.Table;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+
+class ContentStreamState extends StartState
+{       
+    private static Logger logger_ = Logger.getLogger(ContentStreamState.class);
+    private static long count_ = 64*1024*1024;
+    /* Return this byte array to exit event loop */
+    private static byte[] bytes_ = new byte[1];
+    private long bytesRead_ = 0L;
+    private FileChannel fc_;
+    private StreamContextManager.StreamContext streamContext_;
+    private StreamContextManager.StreamStatus streamStatus_;
+    
+    ContentStreamState(TcpReader stream)
+    {
+        super(stream); 
+        SocketChannel socketChannel = stream.getStream();
+        InetSocketAddress remoteAddress = (InetSocketAddress)socketChannel.socket().getRemoteSocketAddress();
+        String remoteHost = remoteAddress.getHostName();        
+        streamContext_ = StreamContextManager.getStreamContext(remoteHost);   
+        streamStatus_ = StreamContextManager.getStreamStatus(remoteHost);
+    }
+    
+    private void createFileChannel() throws IOException
+    {
+        if ( fc_ == null )
+        {
+            logger_.debug("Creating file for " + streamContext_.getTargetFile());
+            FileOutputStream fos = new FileOutputStream( streamContext_.getTargetFile(), true );
+            fc_ = fos.getChannel();            
+        }
+    }
+
+    public byte[] read() throws IOException, ReadNotCompleteException
+    {        
+        SocketChannel socketChannel = stream_.getStream();
+        InetSocketAddress remoteAddress = (InetSocketAddress)socketChannel.socket().getRemoteSocketAddress();
+        String remoteHost = remoteAddress.getHostName();  
+        createFileChannel();
+        if ( streamContext_ != null )
+        {  
+            try
+            {
+                bytesRead_ += fc_.transferFrom(socketChannel, bytesRead_, ContentStreamState.count_);
+                if ( bytesRead_ != streamContext_.getExpectedBytes() )
+                    throw new ReadNotCompleteException("Specified number of bytes have not been read from the Socket Channel");
+            }
+            catch ( IOException ex )
+            {
+                /* Ask the source node to re-stream this file. */
+                streamStatus_.setAction(StreamContextManager.StreamCompletionAction.STREAM);                
+                handleStreamCompletion(remoteHost);
+                /* Delete the orphaned file. */
+                File file = new File(streamContext_.getTargetFile());
+                file.delete();
+                throw ex;
+            }
+            if ( bytesRead_ == streamContext_.getExpectedBytes() )
+            {       
+                logger_.debug("Removing stream context " + streamContext_);                 
+                handleStreamCompletion(remoteHost);                              
+                bytesRead_ = 0L;
+                fc_.close();
+                morphState();
+            }                            
+        }
+        
+        return new byte[0];
+    }
+    
+    private void handleStreamCompletion(String remoteHost) throws IOException
+    {
+        /* 
+         * Streaming is complete. If all the data that has to be received inform the sender via 
+         * the stream completion callback so that the source may perform the requisite cleanup. 
+        */
+        IStreamComplete streamComplete = StreamContextManager.getStreamCompletionHandler(remoteHost);
+        if ( streamComplete != null )
+        {                    
+            streamComplete.onStreamCompletion(remoteHost, streamContext_, streamStatus_);                    
+        }
+    }
+
+    public void morphState() throws IOException
+    {        
+        /* We instantiate an array of size 1 so that we can exit the event loop of the read. */                
+        StartState nextState = stream_.getSocketState(TcpReader.TcpReaderState.DONE);
+        if ( nextState == null )
+        {
+            nextState = new DoneState(stream_, ContentStreamState.bytes_);
+            stream_.putSocketState( TcpReader.TcpReaderState.DONE, nextState );
+        }
+        else
+        {
+            nextState.setContextData(ContentStreamState.bytes_);
+        }
+        stream_.morphState( nextState );  
+    }
+    
+    public void setContextData(Object data)
+    {
+        throw new UnsupportedOperationException("This method is not supported in the ContentStreamState");
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/net/io/DoneState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/io/DoneState.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/io/DoneState.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/io/DoneState.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.nio.channels.SocketChannel;
+import java.io.IOException;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class DoneState extends StartState
+{
+    private byte[] bytes_ = new byte[0];
+
+    DoneState(TcpReader stream, byte[] bytes)
+    {
+        super(stream);
+        bytes_ = bytes;
+    }
+
+    public byte[] read() throws IOException, ReadNotCompleteException
+    {        
+        morphState();
+        return bytes_;
+    }
+
+    public void morphState() throws IOException
+    {                       
+        stream_.morphState(null);
+    }
+    
+    public void setContextData(Object data)
+    {                
+        bytes_ = (byte[])data;
+    }
+}